1use std::{io, mem, net};
2
3use byteorder::{NetworkEndian, ReadBytesExt};
4use cyphernet::addr::{tor, Addr, HostName, NetAddr};
5use radicle::git::Oid;
6use radicle::node::Address;
7
8use crate::prelude::*;
9use crate::service::message::*;
10use crate::wire;
11use crate::wire::{Decode, Encode};
12
13#[repr(u16)]
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum MessageType {
17 NodeAnnouncement = 2,
18 InventoryAnnouncement = 4,
19 RefsAnnouncement = 6,
20 Subscribe = 8,
21 Ping = 10,
22 Pong = 12,
23 Info = 14,
24}
25
26impl From<MessageType> for u16 {
27 fn from(other: MessageType) -> Self {
28 other as u16
29 }
30}
31
32impl TryFrom<u16> for MessageType {
33 type Error = u16;
34
35 fn try_from(other: u16) -> Result<Self, Self::Error> {
36 match other {
37 2 => Ok(MessageType::NodeAnnouncement),
38 4 => Ok(MessageType::InventoryAnnouncement),
39 6 => Ok(MessageType::RefsAnnouncement),
40 8 => Ok(MessageType::Subscribe),
41 10 => Ok(MessageType::Ping),
42 12 => Ok(MessageType::Pong),
43 14 => Ok(MessageType::Info),
44 _ => Err(other),
45 }
46 }
47}
48
49impl Message {
50 pub const MAX_SIZE: wire::Size =
52 wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
53
54 pub fn type_id(&self) -> u16 {
55 match self {
56 Self::Subscribe { .. } => MessageType::Subscribe,
57 Self::Announcement(Announcement { message, .. }) => match message {
58 AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
59 AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
60 AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
61 },
62 Self::Info(_) => MessageType::Info,
63 Self::Ping { .. } => MessageType::Ping,
64 Self::Pong { .. } => MessageType::Pong,
65 }
66 .into()
67 }
68}
69
70impl netservices::Frame for Message {
71 type Error = wire::Error;
72
73 fn unmarshall(mut reader: impl io::Read) -> Result<Option<Self>, Self::Error> {
74 match Message::decode(&mut reader) {
75 Ok(msg) => Ok(Some(msg)),
76 Err(wire::Error::Io(_)) => Ok(None),
77 Err(err) => Err(err),
78 }
79 }
80
81 fn marshall(&self, mut writer: impl io::Write) -> Result<usize, Self::Error> {
82 self.encode(&mut writer).map_err(wire::Error::from)
83 }
84}
85
86#[repr(u8)]
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum AddressType {
90 Ipv4 = 1,
91 Ipv6 = 2,
92 Dns = 3,
93 Onion = 4,
94}
95
96impl From<AddressType> for u8 {
97 fn from(other: AddressType) -> Self {
98 other as u8
99 }
100}
101
102impl From<&Address> for AddressType {
103 fn from(a: &Address) -> Self {
104 match a.host {
105 HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
106 HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
107 HostName::Dns(_) => AddressType::Dns,
108 HostName::Tor(_) => AddressType::Onion,
109 _ => todo!(), }
111 }
112}
113
114impl TryFrom<u8> for AddressType {
115 type Error = u8;
116
117 fn try_from(other: u8) -> Result<Self, Self::Error> {
118 match other {
119 1 => Ok(AddressType::Ipv4),
120 2 => Ok(AddressType::Ipv6),
121 3 => Ok(AddressType::Dns),
122 4 => Ok(AddressType::Onion),
123 _ => Err(other),
124 }
125 }
126}
127
128impl wire::Encode for AnnouncementMessage {
129 fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
130 match self {
131 Self::Node(ann) => ann.encode(writer),
132 Self::Inventory(ann) => ann.encode(writer),
133 Self::Refs(ann) => ann.encode(writer),
134 }
135 }
136}
137
138impl wire::Encode for RefsAnnouncement {
139 fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
140 let mut n = 0;
141
142 n += self.rid.encode(writer)?;
143 n += self.refs.encode(writer)?;
144 n += self.timestamp.encode(writer)?;
145
146 Ok(n)
147 }
148}
149
150impl wire::Decode for RefsAnnouncement {
151 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
152 let rid = RepoId::decode(reader)?;
153 let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(reader)?;
154 let timestamp = Timestamp::decode(reader)?;
155
156 Ok(Self {
157 rid,
158 refs,
159 timestamp,
160 })
161 }
162}
163
164impl wire::Encode for InventoryAnnouncement {
165 fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
166 let mut n = 0;
167
168 n += self.inventory.encode(writer)?;
169 n += self.timestamp.encode(writer)?;
170
171 Ok(n)
172 }
173}
174
175impl wire::Decode for InventoryAnnouncement {
176 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
177 let inventory = BoundedVec::decode(reader)?;
178 let timestamp = Timestamp::decode(reader)?;
179
180 Ok(Self {
181 inventory,
182 timestamp,
183 })
184 }
185}
186
187#[repr(u8)]
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub enum InfoType {
192 RefsAlreadySynced = 1,
193}
194
195impl From<InfoType> for u16 {
196 fn from(other: InfoType) -> Self {
197 other as u16
198 }
199}
200
201impl TryFrom<u16> for InfoType {
202 type Error = u16;
203
204 fn try_from(other: u16) -> Result<Self, Self::Error> {
205 match other {
206 1 => Ok(Self::RefsAlreadySynced),
207 n => Err(n),
208 }
209 }
210}
211
212impl From<Info> for InfoType {
213 fn from(info: Info) -> Self {
214 (&info).into()
215 }
216}
217
218impl From<&Info> for InfoType {
219 fn from(info: &Info) -> Self {
220 match info {
221 Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
222 }
223 }
224}
225
226impl wire::Encode for Info {
227 fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
228 let mut n = 0;
229 n += u16::from(InfoType::from(self)).encode(writer)?;
230 match self {
231 Info::RefsAlreadySynced { rid, at } => {
232 n += rid.encode(writer)?;
233 n += at.encode(writer)?;
234 }
235 }
236
237 Ok(n)
238 }
239}
240
241impl wire::Decode for Info {
242 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
243 let info_type = reader.read_u16::<NetworkEndian>()?;
244
245 match InfoType::try_from(info_type) {
246 Ok(InfoType::RefsAlreadySynced) => {
247 let rid = RepoId::decode(reader)?;
248 let at = Oid::decode(reader)?;
249
250 Ok(Self::RefsAlreadySynced { rid, at })
251 }
252 Err(other) => Err(wire::Error::UnknownInfoType(other)),
253 }
254 }
255}
256
257impl wire::Encode for Message {
258 fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
259 let mut n = self.type_id().encode(writer)?;
260
261 match self {
262 Self::Subscribe(Subscribe {
263 filter,
264 since,
265 until,
266 }) => {
267 n += filter.encode(writer)?;
268 n += since.encode(writer)?;
269 n += until.encode(writer)?;
270 }
271 Self::Announcement(Announcement {
272 node,
273 message,
274 signature,
275 }) => {
276 n += node.encode(writer)?;
277 n += signature.encode(writer)?;
278 n += message.encode(writer)?;
279 }
280 Self::Info(info) => {
281 n += info.encode(writer)?;
282 }
283 Self::Ping(Ping { ponglen, zeroes }) => {
284 n += ponglen.encode(writer)?;
285 n += zeroes.encode(writer)?;
286 }
287 Self::Pong { zeroes } => {
288 n += zeroes.encode(writer)?;
289 }
290 }
291
292 if n > wire::Size::MAX as usize {
293 return Err(io::Error::new(
294 io::ErrorKind::InvalidData,
295 "Message exceeds maximum size",
296 ));
297 }
298 Ok(n)
299 }
300}
301
302impl wire::Decode for Message {
303 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
304 let type_id = reader.read_u16::<NetworkEndian>()?;
305
306 match MessageType::try_from(type_id) {
307 Ok(MessageType::Subscribe) => {
308 let filter = Filter::decode(reader)?;
309 let since = Timestamp::decode(reader)?;
310 let until = Timestamp::decode(reader)?;
311
312 Ok(Self::Subscribe(Subscribe {
313 filter,
314 since,
315 until,
316 }))
317 }
318 Ok(MessageType::NodeAnnouncement) => {
319 let node = NodeId::decode(reader)?;
320 let signature = Signature::decode(reader)?;
321 let message = NodeAnnouncement::decode(reader)?.into();
322
323 Ok(Announcement {
324 node,
325 message,
326 signature,
327 }
328 .into())
329 }
330 Ok(MessageType::InventoryAnnouncement) => {
331 let node = NodeId::decode(reader)?;
332 let signature = Signature::decode(reader)?;
333 let message = InventoryAnnouncement::decode(reader)?.into();
334
335 Ok(Announcement {
336 node,
337 message,
338 signature,
339 }
340 .into())
341 }
342 Ok(MessageType::RefsAnnouncement) => {
343 let node = NodeId::decode(reader)?;
344 let signature = Signature::decode(reader)?;
345 let message = RefsAnnouncement::decode(reader)?.into();
346
347 Ok(Announcement {
348 node,
349 message,
350 signature,
351 }
352 .into())
353 }
354 Ok(MessageType::Info) => {
355 let info = Info::decode(reader)?;
356 Ok(Self::Info(info))
357 }
358 Ok(MessageType::Ping) => {
359 let ponglen = u16::decode(reader)?;
360 let zeroes = ZeroBytes::decode(reader)?;
361 Ok(Self::Ping(Ping { ponglen, zeroes }))
362 }
363 Ok(MessageType::Pong) => {
364 let zeroes = ZeroBytes::decode(reader)?;
365 Ok(Self::Pong { zeroes })
366 }
367 Err(other) => Err(wire::Error::UnknownMessageType(other)),
368 }
369 }
370}
371
372impl wire::Encode for Address {
373 fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
374 let mut n = 0;
375
376 match self.host {
377 HostName::Ip(net::IpAddr::V4(ip)) => {
378 n += u8::from(AddressType::Ipv4).encode(writer)?;
379 n += ip.octets().encode(writer)?;
380 }
381 HostName::Ip(net::IpAddr::V6(ip)) => {
382 n += u8::from(AddressType::Ipv6).encode(writer)?;
383 n += ip.octets().encode(writer)?;
384 }
385 HostName::Dns(ref dns) => {
386 n += u8::from(AddressType::Dns).encode(writer)?;
387 n += dns.encode(writer)?;
388 }
389 HostName::Tor(addr) => {
390 n += u8::from(AddressType::Onion).encode(writer)?;
391 n += addr.encode(writer)?;
392 }
393 _ => {
394 return Err(io::ErrorKind::Unsupported.into());
395 }
396 }
397 n += self.port().encode(writer)?;
398
399 Ok(n)
400 }
401}
402
403impl wire::Decode for Address {
404 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
405 let addrtype = reader.read_u8()?;
406 let host = match AddressType::try_from(addrtype) {
407 Ok(AddressType::Ipv4) => {
408 let octets: [u8; 4] = wire::Decode::decode(reader)?;
409 let ip = net::Ipv4Addr::from(octets);
410
411 HostName::Ip(net::IpAddr::V4(ip))
412 }
413 Ok(AddressType::Ipv6) => {
414 let octets: [u8; 16] = wire::Decode::decode(reader)?;
415 let ip = net::Ipv6Addr::from(octets);
416
417 HostName::Ip(net::IpAddr::V6(ip))
418 }
419 Ok(AddressType::Dns) => {
420 let dns: String = wire::Decode::decode(reader)?;
421
422 HostName::Dns(dns)
423 }
424 Ok(AddressType::Onion) => {
425 let onion: tor::OnionAddrV3 = wire::Decode::decode(reader)?;
426
427 HostName::Tor(onion)
428 }
429 Err(other) => return Err(wire::Error::UnknownAddressType(other)),
430 };
431 let port = u16::decode(reader)?;
432
433 Ok(Self::from(NetAddr { host, port }))
434 }
435}
436
437impl wire::Encode for ZeroBytes {
438 fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
439 let mut n = (self.len() as u16).encode(writer)?;
440 for _ in 0..self.len() {
441 n += 0u8.encode(writer)?;
442 }
443 Ok(n)
444 }
445}
446
447impl wire::Decode for ZeroBytes {
448 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
449 let zeroes = u16::decode(reader)?;
450 for _ in 0..zeroes {
451 _ = u8::decode(reader)?;
452 }
453 Ok(ZeroBytes::new(zeroes))
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use qcheck_macros::quickcheck;
461 use radicle::node::UserAgent;
462 use radicle::storage::refs::RefsAt;
463 use radicle_crypto::test::signer::MockSigner;
464
465 use crate::deserializer::Deserializer;
466 use crate::test::arbitrary;
467 use crate::wire::{self, Encode};
468
469 #[test]
470 fn test_refs_ann_max_size() {
471 let signer = MockSigner::default();
472 let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::gen(1);
473 let ann = AnnouncementMessage::Refs(RefsAnnouncement {
474 rid: arbitrary::gen(1),
475 refs: BoundedVec::collect_from(&mut refs.into_iter()),
476 timestamp: arbitrary::gen(1),
477 });
478 let ann = ann.signed(&signer);
479 let msg = Message::Announcement(ann);
480 let data = wire::serialize(&msg);
481
482 assert!(data.len() < wire::Size::MAX as usize);
483 }
484
485 #[test]
486 fn test_inv_ann_max_size() {
487 let signer = MockSigner::default();
488 let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::gen(1);
489 let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
490 inventory: BoundedVec::collect_from(&mut inv.into_iter()),
491 timestamp: arbitrary::gen(1),
492 });
493 let ann = ann.signed(&signer);
494 let msg = Message::Announcement(ann);
495 let data = wire::serialize(&msg);
496
497 assert!(data.len() < wire::Size::MAX as usize);
498 }
499
500 #[test]
501 fn test_node_ann_max_size() {
502 let signer = MockSigner::default();
503 let addrs: [Address; ADDRESS_LIMIT] = arbitrary::gen(1);
504 let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
505 let ann = AnnouncementMessage::Node(NodeAnnouncement {
506 version: 1,
507 features: Default::default(),
508 alias: radicle::node::Alias::new(String::from_iter(alias)),
509 addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
510 timestamp: arbitrary::gen(1),
511 nonce: u64::MAX,
512 agent: UserAgent::default(),
513 });
514 let ann = ann.signed(&signer);
515 let msg = Message::Announcement(ann);
516 let data = wire::serialize(&msg);
517
518 assert!(data.len() < wire::Size::MAX as usize);
519 }
520
521 #[test]
522 fn test_pingpong_encode_max_size() {
523 let mut buf = Vec::new();
524
525 let ping = Message::Ping(Ping {
526 ponglen: 0,
527 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
528 });
529 ping.encode(&mut buf)
530 .expect("ping should be within max message size");
531
532 let pong = Message::Pong {
533 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
534 };
535 pong.encode(&mut buf)
536 .expect("pong should be within max message size");
537 }
538
539 #[test]
540 fn test_pingpong_encode_size_overflow() {
541 let ping = Message::Ping(Ping {
542 ponglen: 0,
543 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
544 });
545
546 let mut buf = Vec::new();
547 ping.encode(&mut buf)
548 .expect_err("ping should exceed max message size");
549
550 let pong = Message::Pong {
551 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
552 };
553
554 let mut buf = Vec::new();
555 pong.encode(&mut buf)
556 .expect_err("pong should exceed max message size");
557 }
558
559 #[quickcheck]
560 fn prop_message_encode_decode(message: Message) {
561 let encoded = &wire::serialize(&message);
562 let decoded = wire::deserialize::<Message>(encoded).unwrap();
563
564 assert_eq!(message, decoded);
565 }
566
567 #[test]
568 fn prop_message_decoder() {
569 fn property(items: Vec<Message>) {
570 let mut decoder = Deserializer::<1048576, Message>::new(8);
571
572 for item in &items {
573 item.encode(&mut decoder).unwrap();
574 }
575 for item in items {
576 assert_eq!(decoder.next().unwrap().unwrap(), item);
577 }
578 }
579
580 qcheck::QuickCheck::new()
581 .gen(qcheck::Gen::new(16))
582 .quickcheck(property as fn(items: Vec<Message>));
583 }
584
585 #[quickcheck]
586 fn prop_zero_bytes_encode_decode(zeroes: ZeroBytes) {
587 assert_eq!(
588 wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
589 zeroes
590 );
591 }
592
593 #[quickcheck]
594 fn prop_addr(addr: Address) {
595 assert_eq!(
596 wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),
597 addr
598 );
599 }
600}