1use std::{fmt, mem};
2
3use bytes::{Buf, BufMut};
4use nonempty::NonEmpty;
5
6use radicle::crypto;
7use radicle::git;
8use radicle::identity::RepoId;
9use radicle::node;
10use radicle::node::device::Device;
11use radicle::node::{Address, Alias, UserAgent};
12use radicle::storage;
13use radicle::storage::refs::RefsAt;
14
15use crate::bounded::BoundedVec;
16use crate::service::filter::Filter;
17use crate::service::{Link, NodeId, Timestamp};
18use crate::wire;
19use crate::wire::Encode as _;
20
21pub const ADDRESS_LIMIT: usize = 16;
23pub const REF_REMOTE_LIMIT: usize = 1024;
25pub const INVENTORY_LIMIT: usize = 2973;
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Subscribe {
30 pub filter: Filter,
32 pub since: Timestamp,
34 pub until: Timestamp,
36}
37
38impl Subscribe {
39 pub fn all() -> Self {
40 Self {
41 filter: Filter::default(),
42 since: Timestamp::MIN,
43 until: Timestamp::MAX,
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct NodeAnnouncement {
51 pub version: u8,
53 pub features: node::Features,
55 pub timestamp: Timestamp,
57 pub alias: Alias,
59 pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
61 pub nonce: u64,
63 pub agent: UserAgent,
65}
66
67impl NodeAnnouncement {
68 pub fn work(&self) -> u32 {
82 let (n, r, p) = Announcement::POW_PARAMS;
83 let params = scrypt::Params::new(n, r, p, 32).expect("proof-of-work parameters are valid");
84 let mut output = [0u8; 32];
85
86 scrypt::scrypt(
87 &self.encode_to_vec(),
88 Announcement::POW_SALT,
89 ¶ms,
90 &mut output,
91 )
92 .expect("proof-of-work output vector is a valid length");
93
94 if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|(_, &x)| x != 0) {
96 zero_bytes as u32 * 8 + non_zero.leading_zeros()
97 } else {
98 output.len() as u32 * 8
99 }
100 }
101
102 pub fn solve(mut self, target: u32) -> Option<Self> {
108 loop {
109 if let Some(nonce) = self.nonce.checked_add(1) {
110 self.nonce = nonce;
111
112 if self.work() >= target {
113 break;
114 }
115 } else {
116 return None;
117 }
118 }
119 Some(self)
120 }
121}
122
123impl wire::Encode for NodeAnnouncement {
124 fn encode(&self, buf: &mut impl BufMut) {
125 self.version.encode(buf);
126 self.features.encode(buf);
127 self.timestamp.encode(buf);
128 self.alias.encode(buf);
129 self.addresses.encode(buf);
130 self.nonce.encode(buf);
131 self.agent.encode(buf);
132 }
133}
134
135impl wire::Decode for NodeAnnouncement {
136 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
137 let version = u8::decode(buf)?;
138 let features = node::Features::decode(buf)?;
139 let timestamp = Timestamp::decode(buf)?;
140 let alias = wire::Decode::decode(buf)?;
141 let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(buf)?;
142 let nonce = u64::decode(buf)?;
143 let agent = match UserAgent::decode(buf) {
144 Ok(ua) => ua,
145 Err(wire::Error::UnexpectedEnd { .. }) => UserAgent::default(),
146 Err(e) => return Err(e),
147 };
148
149 Ok(Self {
150 version,
151 features,
152 timestamp,
153 alias,
154 addresses,
155 nonce,
156 agent,
157 })
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct RefsAnnouncement {
164 pub rid: RepoId,
166 pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
168 pub timestamp: Timestamp,
170}
171
172#[derive(Default)]
174pub struct RefsStatus {
175 pub want: Vec<RefsAt>,
178 pub have: Vec<RefsAt>,
180}
181
182impl RefsStatus {
183 pub fn new<D: node::refs::Store>(
191 rid: RepoId,
192 refs: NonEmpty<RefsAt>,
193 db: &D,
194 ) -> Result<RefsStatus, storage::Error> {
195 let mut status = RefsStatus::default();
196 for theirs in refs.iter() {
197 status.insert(&rid, *theirs, db)?;
198 }
199 Ok(status)
200 }
201
202 fn insert<D: node::refs::Store>(
203 &mut self,
204 repo: &RepoId,
205 theirs: RefsAt,
206 db: &D,
207 ) -> Result<(), storage::Error> {
208 match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
209 Ok(Some((ours, _))) => {
210 if theirs.at != ours {
211 self.want.push(theirs);
212 } else {
213 self.have.push(theirs);
214 }
215 }
216 Ok(None) => {
217 self.want.push(theirs);
218 }
219 Err(e) => {
220 log::warn!(
221 target: "service",
222 "Error getting cached ref of {repo} for refs status: {e}"
223 );
224 }
225 }
226 Ok(())
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct InventoryAnnouncement {
234 pub inventory: BoundedVec<RepoId, INVENTORY_LIMIT>,
236 pub timestamp: Timestamp,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
245pub enum Info {
246 RefsAlreadySynced { rid: RepoId, at: git::Oid },
249}
250
251#[derive(Clone, PartialEq, Eq)]
253pub enum AnnouncementMessage {
254 Inventory(InventoryAnnouncement),
256 Node(NodeAnnouncement),
258 Refs(RefsAnnouncement),
260}
261
262impl AnnouncementMessage {
263 pub fn signed<G>(self, signer: &Device<G>) -> Announcement
265 where
266 G: crypto::signature::Signer<crypto::Signature>,
267 {
268 use crypto::signature::Signer as _;
269
270 let msg = self.encode_to_vec();
271
272 let signature = signer.sign(&msg);
273
274 Announcement {
275 node: *signer.public_key(),
276 message: self,
277 signature,
278 }
279 }
280
281 pub fn timestamp(&self) -> Timestamp {
282 match self {
283 Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
284 Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
285 Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
286 }
287 }
288
289 pub fn is_node_announcement(&self) -> bool {
290 matches!(self, Self::Node(_))
291 }
292}
293
294impl From<NodeAnnouncement> for AnnouncementMessage {
295 fn from(ann: NodeAnnouncement) -> Self {
296 Self::Node(ann)
297 }
298}
299
300impl From<InventoryAnnouncement> for AnnouncementMessage {
301 fn from(ann: InventoryAnnouncement) -> Self {
302 Self::Inventory(ann)
303 }
304}
305
306impl From<RefsAnnouncement> for AnnouncementMessage {
307 fn from(ann: RefsAnnouncement) -> Self {
308 Self::Refs(ann)
309 }
310}
311
312impl fmt::Debug for AnnouncementMessage {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 match self {
315 Self::Node(message) => write!(f, "Node({})", message.timestamp),
316 Self::Inventory(message) => {
317 write!(
318 f,
319 "Inventory([{}], {})",
320 message
321 .inventory
322 .iter()
323 .map(|i| i.to_string())
324 .collect::<Vec<String>>()
325 .join(", "),
326 message.timestamp
327 )
328 }
329 Self::Refs(message) => {
330 write!(
331 f,
332 "Refs({}, {}, {:?})",
333 message.rid, message.timestamp, message.refs
334 )
335 }
336 }
337 }
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct Announcement {
342 pub node: NodeId,
344 pub signature: crypto::Signature,
346 pub message: AnnouncementMessage,
348}
349
350impl Announcement {
351 #[cfg(debug_assertions)]
363 pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
364 #[cfg(not(debug_assertions))]
365 pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
366 pub const POW_SALT: &'static [u8] = b"rad";
368
369 pub fn verify(&self) -> bool {
371 let msg = self.message.encode_to_vec();
372 self.node.verify(msg, &self.signature).is_ok()
373 }
374
375 pub fn matches(&self, filter: &Filter) -> bool {
376 match &self.message {
377 AnnouncementMessage::Inventory(_) => true,
378 AnnouncementMessage::Node(_) => true,
379 AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
380 }
381 }
382
383 pub fn variant_eq(&self, other: &Self) -> bool {
385 std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
386 }
387
388 pub fn timestamp(&self) -> Timestamp {
390 self.message.timestamp()
391 }
392}
393
394#[derive(Clone, PartialEq, Eq)]
397pub enum Message {
398 Subscribe(Subscribe),
400
401 Announcement(Announcement),
404
405 Info(Info),
409
410 Ping(Ping),
415
416 Pong {
418 zeroes: ZeroBytes,
420 },
421}
422
423impl PartialOrd for Message {
424 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
425 Some(self.cmp(other))
426 }
427}
428
429impl Ord for Message {
430 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
431 let this = self.encode_to_vec();
432 let other = other.encode_to_vec();
433
434 this.cmp(&other)
435 }
436}
437
438impl Message {
439 pub fn announcement(
440 node: NodeId,
441 message: impl Into<AnnouncementMessage>,
442 signature: crypto::Signature,
443 ) -> Self {
444 Announcement {
445 node,
446 signature,
447 message: message.into(),
448 }
449 .into()
450 }
451
452 pub fn node<G: crypto::signature::Signer<crypto::Signature>>(
453 message: NodeAnnouncement,
454 signer: &Device<G>,
455 ) -> Self {
456 AnnouncementMessage::from(message).signed(signer).into()
457 }
458
459 pub fn inventory<G: crypto::signature::Signer<crypto::Signature>>(
460 message: InventoryAnnouncement,
461 signer: &Device<G>,
462 ) -> Self {
463 AnnouncementMessage::from(message).signed(signer).into()
464 }
465
466 pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
467 Self::Subscribe(Subscribe {
468 filter,
469 since,
470 until,
471 })
472 }
473
474 pub fn log(&self, level: log::Level, remote: &NodeId, link: Link) {
475 if !log::log_enabled!(level) {
476 return;
477 }
478 let (verb, prep) = if link.is_inbound() {
479 ("Received", "from")
480 } else {
481 ("Sending", "to")
482 };
483 let msg = match self {
484 Self::Announcement(Announcement { node, message, .. }) => match message {
485 AnnouncementMessage::Node(NodeAnnouncement { addresses, timestamp, .. }) => format!(
486 "{verb} node announcement of {node} with {} address(es) {prep} {remote} (t={timestamp})",
487 addresses.len()
488 ),
489 AnnouncementMessage::Refs(RefsAnnouncement { rid, refs, timestamp }) => format!(
490 "{verb} refs announcement of {node} for {rid} with {} remote(s) {prep} {remote} (t={timestamp})",
491 refs.len()
492 ),
493 AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, timestamp }) => {
494 format!(
495 "{verb} inventory announcement of {node} with {} item(s) {prep} {remote} (t={timestamp})",
496 inventory.len()
497 )
498 }
499 },
500 Self::Info(Info::RefsAlreadySynced { rid, .. }) => {
501 format!(
502 "{verb} `refs-already-synced` info {prep} {remote} for {rid}"
503 )
504 },
505 Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
506 Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
507 Self::Subscribe(Subscribe { .. }) => {
508 format!("{verb} subscription filter {prep} {remote}")
509 }
510 };
511 log::log!(target: "service", level, "{msg}");
512 }
513}
514
515#[derive(Debug, PartialEq, Eq, Clone)]
517pub struct Ping {
518 pub ponglen: wire::Size,
520 pub zeroes: ZeroBytes,
522}
523
524impl Ping {
525 pub const MAX_PING_ZEROES: wire::Size = Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size - mem::size_of::<wire::Size>() as wire::Size; pub const MAX_PONG_ZEROES: wire::Size =
532 Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size; pub fn new(rng: &mut fastrand::Rng) -> Self {
536 let ponglen = rng.u16(0..Self::MAX_PONG_ZEROES);
537
538 Ping {
539 ponglen,
540 zeroes: ZeroBytes::new(rng.u16(0..Self::MAX_PING_ZEROES)),
541 }
542 }
543}
544
545impl From<Announcement> for Message {
546 fn from(ann: Announcement) -> Self {
547 Self::Announcement(ann)
548 }
549}
550
551impl From<Info> for Message {
552 fn from(info: Info) -> Self {
553 Self::Info(info)
554 }
555}
556
557impl fmt::Debug for Message {
558 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559 match self {
560 Self::Subscribe(Subscribe { since, until, .. }) => {
561 write!(f, "Subscribe({since}..{until})")
562 }
563 Self::Announcement(Announcement { node, message, .. }) => {
564 write!(f, "Announcement({node}, {message:?})")
565 }
566 Self::Info(info) => {
567 write!(f, "Info({info:?})")
568 }
569 Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
570 Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
571 }
572 }
573}
574
575#[derive(Clone, Debug, PartialEq, Eq)]
577pub struct ZeroBytes(wire::Size);
578
579impl ZeroBytes {
580 pub fn new(size: wire::Size) -> Self {
581 ZeroBytes(size)
582 }
583
584 pub fn is_empty(&self) -> bool {
585 self.0 == 0
586 }
587
588 pub fn len(&self) -> usize {
589 self.0.into()
590 }
591}
592
593#[cfg(any(test, feature = "test"))]
594#[allow(clippy::unwrap_used)]
595impl qcheck::Arbitrary for Message {
596 fn arbitrary(g: &mut qcheck::Gen) -> Self {
597 use qcheck::Arbitrary;
598
599 match g.choose(&[1, 2, 3, 4, 5, 6, 7]).unwrap() {
600 1 => Announcement {
601 node: NodeId::arbitrary(g),
602 message: InventoryAnnouncement {
603 inventory: BoundedVec::arbitrary(g),
604 timestamp: Timestamp::arbitrary(g),
605 }
606 .into(),
607 signature: crypto::Signature::from(<[u8; 64]>::arbitrary(g)),
608 }
609 .into(),
610 2 => Announcement {
611 node: NodeId::arbitrary(g),
612 message: RefsAnnouncement {
613 rid: RepoId::arbitrary(g),
614 refs: BoundedVec::arbitrary(g),
615 timestamp: Timestamp::arbitrary(g),
616 }
617 .into(),
618 signature: crypto::Signature::from(<[u8; 64]>::arbitrary(g)),
619 }
620 .into(),
621 3 => {
622 let message = NodeAnnouncement {
623 version: u8::arbitrary(g),
624 features: u64::arbitrary(g).into(),
625 timestamp: Timestamp::arbitrary(g),
626 alias: Alias::arbitrary(g),
627 addresses: Arbitrary::arbitrary(g),
628 nonce: u64::arbitrary(g),
629 agent: UserAgent::arbitrary(g),
630 }
631 .into();
632 let bytes: [u8; 64] = Arbitrary::arbitrary(g);
633 let signature = crypto::Signature::from(bytes);
634
635 Announcement {
636 node: NodeId::arbitrary(g),
637 signature,
638 message,
639 }
640 .into()
641 }
642 4 => {
643 let message = Info::RefsAlreadySynced {
644 rid: RepoId::arbitrary(g),
645 at: radicle::test::arbitrary::oid(),
646 };
647 Self::Info(message)
648 }
649 5 => Self::Subscribe(Subscribe {
650 filter: Filter::arbitrary(g),
651 since: Timestamp::arbitrary(g),
652 until: Timestamp::arbitrary(g),
653 }),
654 6 => {
655 let mut rng = fastrand::Rng::with_seed(u64::arbitrary(g));
656
657 Self::Ping(Ping::new(&mut rng))
658 }
659 7 => Self::Pong {
660 zeroes: ZeroBytes::new(u16::arbitrary(g).min(Ping::MAX_PONG_ZEROES)),
661 },
662 _ => panic!("Invalid choice for Message::arbitrary"),
663 }
664 }
665}
666
667#[cfg(any(test, feature = "test"))]
668impl qcheck::Arbitrary for ZeroBytes {
669 fn arbitrary(g: &mut qcheck::Gen) -> Self {
670 ZeroBytes::new(u16::arbitrary(g))
671 }
672}
673
674#[cfg(test)]
675#[allow(clippy::unwrap_used)]
676mod tests {
677 use std::str::FromStr;
678
679 use fastrand;
680 use localtime::LocalTime;
681 use qcheck_macros::quickcheck;
682 use radicle::git::raw;
683 use radicle::test::arbitrary;
684
685 use crate::wire::Decode as _;
686
687 use super::*;
688
689 #[test]
690 fn test_ref_remote_limit() {
691 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
692 let signer = Device::mock();
693 let at = raw::Oid::zero().into();
694
695 assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);
696
697 for _ in 0..refs.capacity() {
698 refs.push(RefsAt {
699 remote: *signer.public_key(),
700 at,
701 })
702 .unwrap();
703 }
704
705 let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
706 rid: arbitrary::gen(1),
707 refs,
708 timestamp: LocalTime::now().into(),
709 })
710 .signed(&Device::mock())
711 .into();
712
713 let mut buf = Vec::new();
714 msg.encode(&mut buf);
715
716 let decoded = Message::decode_exact(buf.as_slice());
717 assert!(decoded.is_ok());
718 assert_eq!(msg, decoded.unwrap());
719 }
720
721 #[test]
722 fn test_inventory_limit() {
723 let msg = Message::inventory(
724 InventoryAnnouncement {
725 inventory: arbitrary::vec(INVENTORY_LIMIT)
726 .try_into()
727 .expect("size within bounds limit"),
728 timestamp: LocalTime::now().into(),
729 },
730 &Device::mock(),
731 );
732 let mut buf: Vec<u8> = Vec::new();
733 msg.encode(&mut buf);
734
735 let decoded = Message::decode_exact(buf.as_slice());
736 assert!(
737 decoded.is_ok(),
738 "INVENTORY_LIMIT is a valid limit for decoding"
739 );
740 assert_eq!(
741 msg,
742 decoded.unwrap(),
743 "encoding and decoding should be safe for message at INVENTORY_LIMIT",
744 );
745 }
746
747 #[quickcheck]
748 fn prop_refs_announcement_signing(rid: RepoId) {
749 let signer = Device::mock_rng(&mut fastrand::Rng::new());
750 let timestamp = Timestamp::EPOCH;
751 let at = raw::Oid::zero().into();
752 let refs = BoundedVec::collect_from(
753 &mut [RefsAt {
754 remote: *signer.public_key(),
755 at,
756 }]
757 .into_iter(),
758 );
759 let message = AnnouncementMessage::Refs(RefsAnnouncement {
760 rid,
761 refs,
762 timestamp,
763 });
764 let ann = message.signed(&signer);
765
766 assert!(ann.verify());
767 }
768
769 #[test]
770 fn test_node_announcement_validate() {
771 let ann = NodeAnnouncement {
772 version: 1,
773 features: node::Features::SEED,
774 timestamp: Timestamp::try_from(42491841u64).unwrap(),
775 alias: Alias::new("alice"),
776 addresses: BoundedVec::new(),
777 nonce: 0,
778 agent: UserAgent::from_str("/heartwood:1.0.0/").unwrap(),
779 };
780
781 assert_eq!(ann.work(), 1);
782 assert_eq!(ann.clone().solve(1).unwrap().work(), 1);
783 assert_eq!(ann.clone().solve(8).unwrap().work(), 10);
784 assert_eq!(ann.solve(14).unwrap().work(), 14);
785 }
786}