1use std::str::FromStr;
2use std::{fmt, mem};
3
4use bytes::{Buf, BufMut};
5use nonempty::NonEmpty;
6
7use radicle::crypto;
8use radicle::git;
9use radicle::identity::RepoId;
10use radicle::node;
11use radicle::node::device::Device;
12use radicle::node::{Address, Alias, UserAgent};
13use radicle::storage;
14use radicle::storage::refs::RefsAt;
15
16use crate::bounded::BoundedVec;
17use crate::service::filter::Filter;
18use crate::service::{Link, NodeId, Timestamp};
19use crate::wire;
20use crate::wire::Encode as _;
21
22pub const ADDRESS_LIMIT: usize = 16;
24pub const REF_REMOTE_LIMIT: usize = 1024;
26pub const INVENTORY_LIMIT: usize = 2973;
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct Subscribe {
31 pub filter: Filter,
33 pub since: Timestamp,
35 pub until: Timestamp,
37}
38
39impl Subscribe {
40 pub fn all() -> Self {
41 Self {
42 filter: Filter::default(),
43 since: Timestamp::MIN,
44 until: Timestamp::MAX,
45 }
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct NodeAnnouncement {
52 pub version: u8,
54 pub features: node::Features,
56 pub timestamp: Timestamp,
58 pub alias: Alias,
60 pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
62 pub nonce: u64,
64 pub agent: UserAgent,
66}
67
68impl NodeAnnouncement {
69 pub fn work(&self) -> u32 {
83 let (n, r, p) = Announcement::POW_PARAMS;
84 let params = scrypt::Params::new(n, r, p, 32).expect("proof-of-work parameters are valid");
85 let mut output = [0u8; 32];
86
87 scrypt::scrypt(
88 &self.encode_to_vec(),
89 Announcement::POW_SALT,
90 ¶ms,
91 &mut output,
92 )
93 .expect("proof-of-work output vector is a valid length");
94
95 if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|&(_, &x)| x != 0) {
97 zero_bytes as u32 * 8 + non_zero.leading_zeros()
98 } else {
99 output.len() as u32 * 8
100 }
101 }
102
103 pub fn solve(mut self, target: u32) -> Option<Self> {
109 loop {
110 if let Some(nonce) = self.nonce.checked_add(1) {
111 self.nonce = nonce;
112
113 if self.work() >= target {
114 break;
115 }
116 } else {
117 return None;
118 }
119 }
120 Some(self)
121 }
122}
123
124impl wire::Encode for NodeAnnouncement {
125 fn encode(&self, buf: &mut impl BufMut) {
126 self.version.encode(buf);
127 self.features.encode(buf);
128 self.timestamp.encode(buf);
129 self.alias.encode(buf);
130 self.addresses.encode(buf);
131 self.nonce.encode(buf);
132 self.agent.encode(buf);
133 }
134}
135
136impl wire::Decode for NodeAnnouncement {
137 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
138 let version = u8::decode(buf)?;
139 let features = node::Features::decode(buf)?;
140 let timestamp = Timestamp::decode(buf)?;
141 let alias = wire::Decode::decode(buf)?;
142 let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(buf)?;
143 let nonce = u64::decode(buf)?;
144 let agent = match UserAgent::decode(buf) {
145 Ok(ua) => ua,
146 Err(wire::Error::UnexpectedEnd { .. }) => {
147 UserAgent::from_str("/radicle/message/truncated/").expect("valid user agent")
148 }
149 Err(e) => return Err(e),
150 };
151
152 Ok(Self {
153 version,
154 features,
155 timestamp,
156 alias,
157 addresses,
158 nonce,
159 agent,
160 })
161 }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct RefsAnnouncement {
167 pub rid: RepoId,
169 pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
171 pub timestamp: Timestamp,
173}
174
175#[derive(Default)]
177pub struct RefsStatus {
178 pub want: Vec<RefsAt>,
181 pub have: Vec<RefsAt>,
183}
184
185impl RefsStatus {
186 pub fn new<D: node::refs::Store>(
194 rid: RepoId,
195 refs: NonEmpty<RefsAt>,
196 db: &D,
197 ) -> Result<RefsStatus, storage::Error> {
198 let mut status = RefsStatus::default();
199 for theirs in refs.iter() {
200 status.insert(&rid, *theirs, db)?;
201 }
202 Ok(status)
203 }
204
205 fn insert<D: node::refs::Store>(
206 &mut self,
207 repo: &RepoId,
208 theirs: RefsAt,
209 db: &D,
210 ) -> Result<(), storage::Error> {
211 match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
212 Ok(Some((ours, _))) => {
213 if theirs.at != ours {
214 self.want.push(theirs);
215 } else {
216 self.have.push(theirs);
217 }
218 }
219 Ok(None) => {
220 self.want.push(theirs);
221 }
222 Err(e) => {
223 log::debug!(
224 target: "service",
225 "Failed to get cached 'rad/sigrefs' of {} in {repo} for refs status: {e}", theirs.remote,
226 );
227 }
228 }
229 Ok(())
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
236pub struct InventoryAnnouncement {
237 pub inventory: BoundedVec<RepoId, INVENTORY_LIMIT>,
239 pub timestamp: Timestamp,
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum Info {
249 RefsAlreadySynced { rid: RepoId, at: git::Oid },
252}
253
254#[derive(Clone, PartialEq, Eq)]
256pub enum AnnouncementMessage {
257 Inventory(InventoryAnnouncement),
259 Node(NodeAnnouncement),
261 Refs(RefsAnnouncement),
263}
264
265impl AnnouncementMessage {
266 pub fn signed<G>(self, signer: &Device<G>) -> Announcement
268 where
269 G: crypto::signature::Signer<crypto::Signature>,
270 {
271 use crypto::signature::Signer as _;
272
273 let msg = self.encode_to_vec();
274
275 let signature = signer.sign(&msg);
276
277 Announcement {
278 node: *signer.public_key(),
279 message: self,
280 signature,
281 }
282 }
283
284 pub fn timestamp(&self) -> Timestamp {
285 match self {
286 Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
287 Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
288 Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
289 }
290 }
291
292 pub fn is_node_announcement(&self) -> bool {
293 matches!(self, Self::Node(_))
294 }
295}
296
297impl From<NodeAnnouncement> for AnnouncementMessage {
298 fn from(ann: NodeAnnouncement) -> Self {
299 Self::Node(ann)
300 }
301}
302
303impl From<InventoryAnnouncement> for AnnouncementMessage {
304 fn from(ann: InventoryAnnouncement) -> Self {
305 Self::Inventory(ann)
306 }
307}
308
309impl From<RefsAnnouncement> for AnnouncementMessage {
310 fn from(ann: RefsAnnouncement) -> Self {
311 Self::Refs(ann)
312 }
313}
314
315impl fmt::Debug for AnnouncementMessage {
316 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317 match self {
318 Self::Node(message) => write!(f, "Node({})", message.timestamp),
319 Self::Inventory(message) => {
320 write!(
321 f,
322 "Inventory([{}], {})",
323 message
324 .inventory
325 .iter()
326 .map(|i| i.to_string())
327 .collect::<Vec<String>>()
328 .join(", "),
329 message.timestamp
330 )
331 }
332 Self::Refs(message) => {
333 write!(
334 f,
335 "Refs({}, {}, {:?})",
336 message.rid, message.timestamp, message.refs
337 )
338 }
339 }
340 }
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct Announcement {
345 pub node: NodeId,
347 pub signature: crypto::Signature,
349 pub message: AnnouncementMessage,
351}
352
353impl Announcement {
354 #[cfg(debug_assertions)]
366 pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
367 #[cfg(not(debug_assertions))]
368 pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
369 pub const POW_SALT: &'static [u8] = b"rad";
371
372 pub fn verify(&self) -> bool {
374 let msg = self.message.encode_to_vec();
375 self.node.verify(msg, &self.signature).is_ok()
376 }
377
378 pub fn matches(&self, filter: &Filter) -> bool {
379 match &self.message {
380 AnnouncementMessage::Inventory(_) => true,
381 AnnouncementMessage::Node(_) => true,
382 AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
383 }
384 }
385
386 pub fn variant_eq(&self, other: &Self) -> bool {
388 std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
389 }
390
391 pub fn timestamp(&self) -> Timestamp {
393 self.message.timestamp()
394 }
395}
396
397#[derive(Clone, PartialEq, Eq)]
400pub enum Message {
401 Subscribe(Subscribe),
403
404 Announcement(Announcement),
407
408 Info(Info),
412
413 Ping(Ping),
418
419 Pong {
421 zeroes: ZeroBytes,
423 },
424}
425
426impl PartialOrd for Message {
427 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
428 Some(self.cmp(other))
429 }
430}
431
432impl Ord for Message {
433 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
434 let this = self.encode_to_vec();
435 let other = other.encode_to_vec();
436
437 this.cmp(&other)
438 }
439}
440
441impl Message {
442 pub fn announcement(
443 node: NodeId,
444 message: impl Into<AnnouncementMessage>,
445 signature: crypto::Signature,
446 ) -> Self {
447 Announcement {
448 node,
449 signature,
450 message: message.into(),
451 }
452 .into()
453 }
454
455 pub fn node<G: crypto::signature::Signer<crypto::Signature>>(
456 message: NodeAnnouncement,
457 signer: &Device<G>,
458 ) -> Self {
459 AnnouncementMessage::from(message).signed(signer).into()
460 }
461
462 pub fn inventory<G: crypto::signature::Signer<crypto::Signature>>(
463 message: InventoryAnnouncement,
464 signer: &Device<G>,
465 ) -> Self {
466 AnnouncementMessage::from(message).signed(signer).into()
467 }
468
469 pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
470 Self::Subscribe(Subscribe {
471 filter,
472 since,
473 until,
474 })
475 }
476
477 pub fn log(&self, level: log::Level, remote: &NodeId, link: Link) {
478 if !log::log_enabled!(level) {
479 return;
480 }
481 let (verb, prep) = if link.is_inbound() {
482 ("Received", "from")
483 } else {
484 ("Sending", "to")
485 };
486 let msg = match self {
487 Self::Announcement(Announcement { node, message, .. }) => match message {
488 AnnouncementMessage::Node(NodeAnnouncement {
489 addresses,
490 timestamp,
491 ..
492 }) => format!(
493 "{verb} node announcement of {node} with {} address(es) {prep} {remote} (t={timestamp})",
494 addresses.len()
495 ),
496 AnnouncementMessage::Refs(RefsAnnouncement {
497 rid,
498 refs,
499 timestamp,
500 }) => format!(
501 "{verb} refs announcement of {node} for {rid} with {} remote(s) {prep} {remote} (t={timestamp})",
502 refs.len()
503 ),
504 AnnouncementMessage::Inventory(InventoryAnnouncement {
505 inventory,
506 timestamp,
507 }) => {
508 format!(
509 "{verb} inventory announcement of {node} with {} item(s) {prep} {remote} (t={timestamp})",
510 inventory.len()
511 )
512 }
513 },
514 Self::Info(Info::RefsAlreadySynced { rid, .. }) => {
515 format!("{verb} `refs-already-synced` info {prep} {remote} for {rid}")
516 }
517 Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
518 Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
519 Self::Subscribe(Subscribe { .. }) => {
520 format!("{verb} subscription filter {prep} {remote}")
521 }
522 };
523 log::log!(target: "service", level, "{msg}");
524 }
525}
526
527#[derive(Debug, PartialEq, Eq, Clone)]
529pub struct Ping {
530 pub ponglen: wire::Size,
532 pub zeroes: ZeroBytes,
534}
535
536impl Ping {
537 pub const MAX_PING_ZEROES: wire::Size = Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size - mem::size_of::<wire::Size>() as wire::Size; pub const MAX_PONG_ZEROES: wire::Size =
544 Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size; pub fn new(rng: &mut fastrand::Rng) -> Self {
548 let ponglen = rng.u16(0..Self::MAX_PONG_ZEROES);
549
550 Ping {
551 ponglen,
552 zeroes: ZeroBytes::new(rng.u16(0..Self::MAX_PING_ZEROES)),
553 }
554 }
555}
556
557impl From<Announcement> for Message {
558 fn from(ann: Announcement) -> Self {
559 Self::Announcement(ann)
560 }
561}
562
563impl From<Info> for Message {
564 fn from(info: Info) -> Self {
565 Self::Info(info)
566 }
567}
568
569impl fmt::Debug for Message {
570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571 match self {
572 Self::Subscribe(Subscribe { since, until, .. }) => {
573 write!(f, "Subscribe({since}..{until})")
574 }
575 Self::Announcement(Announcement { node, message, .. }) => {
576 write!(f, "Announcement({node}, {message:?})")
577 }
578 Self::Info(info) => {
579 write!(f, "Info({info:?})")
580 }
581 Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
582 Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
583 }
584 }
585}
586
587#[derive(Clone, Debug, PartialEq, Eq)]
589pub struct ZeroBytes(wire::Size);
590
591impl ZeroBytes {
592 pub fn new(size: wire::Size) -> Self {
593 ZeroBytes(size)
594 }
595
596 pub fn is_empty(&self) -> bool {
597 self.0 == 0
598 }
599
600 pub fn len(&self) -> usize {
601 self.0.into()
602 }
603}
604
605#[cfg(any(test, feature = "test"))]
606#[allow(clippy::unwrap_used)]
607impl qcheck::Arbitrary for Message {
608 fn arbitrary(g: &mut qcheck::Gen) -> Self {
609 use qcheck::Arbitrary;
610
611 match g.choose(&[1, 2, 3, 4, 5, 6, 7]).unwrap() {
612 1 => Announcement {
613 node: NodeId::arbitrary(g),
614 message: InventoryAnnouncement {
615 inventory: BoundedVec::arbitrary(g),
616 timestamp: Timestamp::arbitrary(g),
617 }
618 .into(),
619 signature: crypto::Signature::from(<[u8; 64]>::arbitrary(g)),
620 }
621 .into(),
622 2 => Announcement {
623 node: NodeId::arbitrary(g),
624 message: RefsAnnouncement {
625 rid: RepoId::arbitrary(g),
626 refs: BoundedVec::arbitrary(g),
627 timestamp: Timestamp::arbitrary(g),
628 }
629 .into(),
630 signature: crypto::Signature::from(<[u8; 64]>::arbitrary(g)),
631 }
632 .into(),
633 3 => {
634 let message = NodeAnnouncement {
635 version: u8::arbitrary(g),
636 features: u64::arbitrary(g).into(),
637 timestamp: Timestamp::arbitrary(g),
638 alias: Alias::arbitrary(g),
639 addresses: Arbitrary::arbitrary(g),
640 nonce: u64::arbitrary(g),
641 agent: UserAgent::arbitrary(g),
642 }
643 .into();
644 let bytes: [u8; 64] = Arbitrary::arbitrary(g);
645 let signature = crypto::Signature::from(bytes);
646
647 Announcement {
648 node: NodeId::arbitrary(g),
649 signature,
650 message,
651 }
652 .into()
653 }
654 4 => {
655 let message = Info::RefsAlreadySynced {
656 rid: RepoId::arbitrary(g),
657 at: radicle::test::arbitrary::oid(),
658 };
659 Self::Info(message)
660 }
661 5 => Self::Subscribe(Subscribe {
662 filter: Filter::arbitrary(g),
663 since: Timestamp::arbitrary(g),
664 until: Timestamp::arbitrary(g),
665 }),
666 6 => {
667 let mut rng = fastrand::Rng::with_seed(u64::arbitrary(g));
668
669 Self::Ping(Ping::new(&mut rng))
670 }
671 7 => Self::Pong {
672 zeroes: ZeroBytes::new(u16::arbitrary(g).min(Ping::MAX_PONG_ZEROES)),
673 },
674 _ => panic!("Invalid choice for Message::arbitrary"),
675 }
676 }
677}
678
679#[cfg(any(test, feature = "test"))]
680impl qcheck::Arbitrary for ZeroBytes {
681 fn arbitrary(g: &mut qcheck::Gen) -> Self {
682 ZeroBytes::new(u16::arbitrary(g))
683 }
684}
685
686#[cfg(test)]
687#[allow(clippy::unwrap_used)]
688mod tests {
689
690 use fastrand;
691 use localtime::LocalTime;
692 use qcheck_macros::quickcheck;
693 use radicle::test::arbitrary;
694
695 use crate::wire::Decode as _;
696
697 use super::*;
698
699 #[test]
700 fn test_ref_remote_limit() {
701 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
702 let signer = Device::mock();
703 let at = git::Oid::ZERO_SHA1;
704
705 assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);
706
707 for _ in 0..refs.capacity() {
708 refs.push(RefsAt {
709 remote: *signer.public_key(),
710 at,
711 })
712 .unwrap();
713 }
714
715 let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
716 rid: arbitrary::r#gen(1),
717 refs,
718 timestamp: LocalTime::now().into(),
719 })
720 .signed(&Device::mock())
721 .into();
722
723 let mut buf = Vec::new();
724 msg.encode(&mut buf);
725
726 let decoded = Message::decode_exact(buf.as_slice());
727 assert!(decoded.is_ok());
728 assert_eq!(msg, decoded.unwrap());
729 }
730
731 #[test]
732 fn test_inventory_limit() {
733 let msg = Message::inventory(
734 InventoryAnnouncement {
735 inventory: arbitrary::vec(INVENTORY_LIMIT)
736 .try_into()
737 .expect("size within bounds limit"),
738 timestamp: LocalTime::now().into(),
739 },
740 &Device::mock(),
741 );
742 let mut buf: Vec<u8> = Vec::new();
743 msg.encode(&mut buf);
744
745 let decoded = Message::decode_exact(buf.as_slice());
746 assert!(
747 decoded.is_ok(),
748 "INVENTORY_LIMIT is a valid limit for decoding"
749 );
750 assert_eq!(
751 msg,
752 decoded.unwrap(),
753 "encoding and decoding should be safe for message at INVENTORY_LIMIT",
754 );
755 }
756
757 #[quickcheck]
758 fn prop_refs_announcement_signing(rid: RepoId) {
759 let signer = Device::mock_rng(&mut fastrand::Rng::new());
760 let timestamp = Timestamp::EPOCH;
761 let at = git::Oid::ZERO_SHA1;
762 let refs = BoundedVec::collect_from(
763 &mut [RefsAt {
764 remote: *signer.public_key(),
765 at,
766 }]
767 .into_iter(),
768 );
769 let message = AnnouncementMessage::Refs(RefsAnnouncement {
770 rid,
771 refs,
772 timestamp,
773 });
774 let ann = message.signed(&signer);
775
776 assert!(ann.verify());
777 }
778
779 #[test]
780 fn test_node_announcement_validate() {
781 let ann = NodeAnnouncement {
782 version: 1,
783 features: node::Features::SEED,
784 timestamp: Timestamp::try_from(42491841u64).unwrap(),
785 alias: Alias::new("alice"),
786 addresses: BoundedVec::new(),
787 nonce: 0,
788 agent: UserAgent::test(),
789 };
790
791 assert_eq!(ann.work(), 2);
792 assert_eq!(ann.clone().solve(1).unwrap().work(), 1);
793 assert_eq!(ann.clone().solve(8).unwrap().work(), 8);
794 assert_eq!(ann.solve(14).unwrap().work(), 14);
795 }
796}