radicle_node/service/
message.rs

1use std::{fmt, io, mem};
2
3use nonempty::NonEmpty;
4use radicle::git;
5use radicle::node::device::Device;
6use radicle::storage::refs::RefsAt;
7
8use crate::crypto;
9use crate::identity::RepoId;
10use crate::node;
11use crate::node::{Address, Alias, UserAgent};
12use crate::prelude::BoundedVec;
13use crate::service::filter::Filter;
14use crate::service::{Link, NodeId, Timestamp};
15use crate::storage;
16use crate::wire;
17
18/// Maximum number of addresses which can be announced to other nodes.
19pub const ADDRESS_LIMIT: usize = 16;
20/// Maximum number of repository remotes that can be included in a [`RefsAnnouncement`] message.
21pub const REF_REMOTE_LIMIT: usize = 1024;
22/// Maximum number of inventory which can be announced to other nodes.
23pub const INVENTORY_LIMIT: usize = 2973;
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct Subscribe {
27    /// Subscribe to events matching this filter.
28    pub filter: Filter,
29    /// Request messages since this time.
30    pub since: Timestamp,
31    /// Request messages until this time.
32    pub until: Timestamp,
33}
34
35impl Subscribe {
36    pub fn all() -> Self {
37        Self {
38            filter: Filter::default(),
39            since: Timestamp::MIN,
40            until: Timestamp::MAX,
41        }
42    }
43}
44
45/// Node announcing itself to the network.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct NodeAnnouncement {
48    /// Supported protocol version.
49    pub version: u8,
50    /// Advertized features.
51    pub features: node::Features,
52    /// Monotonic timestamp.
53    pub timestamp: Timestamp,
54    /// Non-unique alias.
55    pub alias: Alias,
56    /// Announced addresses.
57    pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
58    /// Nonce used for announcement proof-of-work.
59    pub nonce: u64,
60    /// User-agent string.
61    pub agent: UserAgent,
62}
63
64impl NodeAnnouncement {
65    /// Calculate the amount of work that went into creating this announcement.
66    ///
67    /// Proof-of-work uses the [`scrypt`] algorithm with the parameters in
68    /// [`Announcement::POW_PARAMS`]. The "work" is calculated by counting the number of leading
69    /// zero bits after running `scrypt` on a serialized [`NodeAnnouncement`] using
70    /// [`wire::serialize`].
71    ///
72    /// In other words, `work = leading-zeros(scrypt(serialize(announcement)))`.
73    ///
74    /// Higher numbers mean higher difficulty. For each increase in work, difficulty is doubled.
75    /// For instance, an output of `7` is *four* times more work than an output of `5`.
76    ///
77    pub fn work(&self) -> u32 {
78        let (n, r, p) = Announcement::POW_PARAMS;
79        let params = scrypt::Params::new(n, r, p, 32).expect("proof-of-work parameters are valid");
80        let mut output = [0u8; 32];
81
82        scrypt::scrypt(
83            wire::serialize(self).as_ref(),
84            Announcement::POW_SALT,
85            &params,
86            &mut output,
87        )
88        .expect("proof-of-work output vector is a valid length");
89
90        // Calculate the number of leading zero bits in the output vector.
91        if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|(_, &x)| x != 0) {
92            zero_bytes as u32 * 8 + non_zero.leading_zeros()
93        } else {
94            output.len() as u32 * 8
95        }
96    }
97
98    /// Solve the proof-of-work of a node announcement for the given target, by iterating through
99    /// different nonces.
100    ///
101    /// If the given difficulty target is too high, there may not be a result. In that case, `None`
102    /// is returned.
103    pub fn solve(mut self, target: u32) -> Option<Self> {
104        loop {
105            if let Some(nonce) = self.nonce.checked_add(1) {
106                self.nonce = nonce;
107
108                if self.work() >= target {
109                    break;
110                }
111            } else {
112                return None;
113            }
114        }
115        Some(self)
116    }
117}
118
119impl wire::Encode for NodeAnnouncement {
120    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
121        let mut n = 0;
122
123        n += self.version.encode(writer)?;
124        n += self.features.encode(writer)?;
125        n += self.timestamp.encode(writer)?;
126        n += self.alias.encode(writer)?;
127        n += self.addresses.encode(writer)?;
128        n += self.nonce.encode(writer)?;
129        n += self.agent.encode(writer)?;
130
131        Ok(n)
132    }
133}
134
135impl wire::Decode for NodeAnnouncement {
136    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
137        let version = u8::decode(reader)?;
138        let features = node::Features::decode(reader)?;
139        let timestamp = Timestamp::decode(reader)?;
140        let alias = wire::Decode::decode(reader)?;
141        let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(reader)?;
142        let nonce = u64::decode(reader)?;
143        let agent = match UserAgent::decode(reader) {
144            Ok(ua) => ua,
145            Err(e) if e.is_eof() => UserAgent::default(),
146            Err(e) => return Err(e),
147        };
148
149        Ok(Self {
150            version,
151            features,
152            timestamp,
153            alias,
154            addresses,
155            nonce,
156            agent,
157        })
158    }
159}
160
161/// Node announcing project refs being created or updated.
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct RefsAnnouncement {
164    /// Repository identifier.
165    pub rid: RepoId,
166    /// Updated `rad/sigrefs`.
167    pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
168    /// Time of announcement.
169    pub timestamp: Timestamp,
170}
171
172/// Track the status of `RefsAt` within a given repository.
173#[derive(Default)]
174pub struct RefsStatus {
175    /// The `rad/sigrefs` was missing or it's ahead of the local
176    /// `rad/sigrefs`. We want it.
177    pub want: Vec<RefsAt>,
178    /// The `rad/sigrefs` has been seen before. We already have it.
179    pub have: Vec<RefsAt>,
180}
181
182impl RefsStatus {
183    /// Get the set of `want` and `have` `RefsAt`'s for the given
184    /// announcement.
185    ///
186    /// Nb. We use the refs database as a cache for quick lookups. This does *not* check
187    /// for ancestry matches, since we don't cache the whole history (only the tips).
188    /// This, however, is not a problem because the signed refs branch is fast-forward only,
189    /// and old refs announcements will be discarded due to their lower timestamps.
190    pub fn new<D: node::refs::Store>(
191        rid: RepoId,
192        refs: NonEmpty<RefsAt>,
193        db: &D,
194    ) -> Result<RefsStatus, storage::Error> {
195        let mut status = RefsStatus::default();
196        for theirs in refs.iter() {
197            status.insert(&rid, *theirs, db)?;
198        }
199        Ok(status)
200    }
201
202    fn insert<D: node::refs::Store>(
203        &mut self,
204        repo: &RepoId,
205        theirs: RefsAt,
206        db: &D,
207    ) -> Result<(), storage::Error> {
208        match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
209            Ok(Some((ours, _))) => {
210                if theirs.at != ours {
211                    self.want.push(theirs);
212                } else {
213                    self.have.push(theirs);
214                }
215            }
216            Ok(None) => {
217                self.want.push(theirs);
218            }
219            Err(e) => {
220                log::warn!(
221                    target: "service",
222                    "Error getting cached ref of {repo} for refs status: {e}"
223                );
224            }
225        }
226        Ok(())
227    }
228}
229
230/// Node announcing its inventory to the network.
231/// This should be the whole inventory every time.
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct InventoryAnnouncement {
234    /// Node inventory.
235    pub inventory: BoundedVec<RepoId, INVENTORY_LIMIT>,
236    /// Time of announcement.
237    pub timestamp: Timestamp,
238}
239
240/// Node announcing information to a connected peer.
241///
242/// This should not be relayed and should be used to send an
243/// informational message a peer.
244#[derive(Debug, Clone, PartialEq, Eq)]
245pub enum Info {
246    /// Tell a node that sent a refs announcement that it was already synced at the given `Oid`,
247    /// for this particular `rid`.
248    RefsAlreadySynced { rid: RepoId, at: git::Oid },
249}
250
251/// Announcement messages are messages that are relayed between peers.
252#[derive(Clone, PartialEq, Eq)]
253pub enum AnnouncementMessage {
254    /// Inventory announcement.
255    Inventory(InventoryAnnouncement),
256    /// Node announcement.
257    Node(NodeAnnouncement),
258    /// Refs announcement.
259    Refs(RefsAnnouncement),
260}
261
262impl AnnouncementMessage {
263    /// Sign this announcement message.
264    pub fn signed<G>(self, signer: &Device<G>) -> Announcement
265    where
266        G: crypto::signature::Signer<crypto::Signature>,
267    {
268        use crypto::signature::Signer as _;
269
270        let msg = wire::serialize(&self);
271        let signature = signer.sign(&msg);
272
273        Announcement {
274            node: *signer.public_key(),
275            message: self,
276            signature,
277        }
278    }
279
280    pub fn timestamp(&self) -> Timestamp {
281        match self {
282            Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
283            Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
284            Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
285        }
286    }
287
288    pub fn is_node_announcement(&self) -> bool {
289        matches!(self, Self::Node(_))
290    }
291}
292
293impl From<NodeAnnouncement> for AnnouncementMessage {
294    fn from(ann: NodeAnnouncement) -> Self {
295        Self::Node(ann)
296    }
297}
298
299impl From<InventoryAnnouncement> for AnnouncementMessage {
300    fn from(ann: InventoryAnnouncement) -> Self {
301        Self::Inventory(ann)
302    }
303}
304
305impl From<RefsAnnouncement> for AnnouncementMessage {
306    fn from(ann: RefsAnnouncement) -> Self {
307        Self::Refs(ann)
308    }
309}
310
311impl fmt::Debug for AnnouncementMessage {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        match self {
314            Self::Node(message) => write!(f, "Node({})", message.timestamp),
315            Self::Inventory(message) => {
316                write!(
317                    f,
318                    "Inventory([{}], {})",
319                    message
320                        .inventory
321                        .iter()
322                        .map(|i| i.to_string())
323                        .collect::<Vec<String>>()
324                        .join(", "),
325                    message.timestamp
326                )
327            }
328            Self::Refs(message) => {
329                write!(
330                    f,
331                    "Refs({}, {}, {:?})",
332                    message.rid, message.timestamp, message.refs
333                )
334            }
335        }
336    }
337}
338
339#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct Announcement {
341    /// Node identifier.
342    pub node: NodeId,
343    /// Signature over the announcement.
344    pub signature: crypto::Signature,
345    /// Unsigned node announcement.
346    pub message: AnnouncementMessage,
347}
348
349impl Announcement {
350    /// Proof-of-work parameters for announcements.
351    ///
352    /// These parameters are fed into `scrypt`.
353    /// They represent the `log2(N)`, `r`, `p` parameters, respectively.
354    ///
355    /// * log2(N) – iterations count (affects memory and CPU usage), e.g. 15
356    /// * r – block size (affects memory and CPU usage), e.g. 8
357    /// * p – parallelism factor (threads to run in parallel - affects the memory, CPU usage), usually 1
358    ///
359    /// `15, 8, 1` are usually the recommended parameters.
360    ///
361    #[cfg(debug_assertions)]
362    pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
363    #[cfg(not(debug_assertions))]
364    pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
365    /// Salt used for generating PoW.
366    pub const POW_SALT: &'static [u8] = &[b'r', b'a', b'd'];
367
368    /// Verify this announcement's signature.
369    pub fn verify(&self) -> bool {
370        let msg = wire::serialize(&self.message);
371        self.node.verify(msg, &self.signature).is_ok()
372    }
373
374    pub fn matches(&self, filter: &Filter) -> bool {
375        match &self.message {
376            AnnouncementMessage::Inventory(_) => true,
377            AnnouncementMessage::Node(_) => true,
378            AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
379        }
380    }
381
382    /// Check whether this announcement is of the same variant as another.
383    pub fn variant_eq(&self, other: &Self) -> bool {
384        std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
385    }
386
387    /// Get the announcement timestamp.
388    pub fn timestamp(&self) -> Timestamp {
389        self.message.timestamp()
390    }
391}
392
393/// Message payload.
394/// These are the messages peers send to each other.
395#[derive(Clone, PartialEq, Eq)]
396pub enum Message {
397    /// Subscribe to gossip messages matching the filter and time range.
398    Subscribe(Subscribe),
399
400    /// Gossip announcement. These messages are relayed to peers, and filtered
401    /// using [`Message::Subscribe`].
402    Announcement(Announcement),
403
404    /// Informational message. These messages are sent between peers for information
405    /// and do not need to be acted upon. They can be safely ignored, though handling
406    /// them can be useful for the user.
407    Info(Info),
408
409    /// Ask a connected peer for a Pong.
410    ///
411    /// Used to check if the remote peer is responsive, or a side-effect free way to keep a
412    /// connection alive.
413    Ping(Ping),
414
415    /// Response to `Ping` message.
416    Pong {
417        /// The pong payload.
418        zeroes: ZeroBytes,
419    },
420}
421
422impl PartialOrd for Message {
423    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
424        Some(self.cmp(other))
425    }
426}
427
428impl Ord for Message {
429    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
430        let this = wire::serialize(self);
431        let other = wire::serialize(other);
432
433        this.cmp(&other)
434    }
435}
436
437impl Message {
438    pub fn announcement(
439        node: NodeId,
440        message: impl Into<AnnouncementMessage>,
441        signature: crypto::Signature,
442    ) -> Self {
443        Announcement {
444            node,
445            signature,
446            message: message.into(),
447        }
448        .into()
449    }
450
451    pub fn node<G: crypto::signature::Signer<crypto::Signature>>(
452        message: NodeAnnouncement,
453        signer: &Device<G>,
454    ) -> Self {
455        AnnouncementMessage::from(message).signed(signer).into()
456    }
457
458    pub fn inventory<G: crypto::signature::Signer<crypto::Signature>>(
459        message: InventoryAnnouncement,
460        signer: &Device<G>,
461    ) -> Self {
462        AnnouncementMessage::from(message).signed(signer).into()
463    }
464
465    pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
466        Self::Subscribe(Subscribe {
467            filter,
468            since,
469            until,
470        })
471    }
472
473    pub fn log(&self, level: log::Level, remote: &NodeId, link: Link) {
474        if !log::log_enabled!(level) {
475            return;
476        }
477        let (verb, prep) = if link.is_inbound() {
478            ("Received", "from")
479        } else {
480            ("Sending", "to")
481        };
482        let msg = match self {
483            Self::Announcement(Announcement { node, message, .. }) => match message {
484                AnnouncementMessage::Node(NodeAnnouncement { addresses, timestamp, .. }) => format!(
485                    "{verb} node announcement of {node} with {} address(es) {prep} {remote} (t={timestamp})",
486                    addresses.len()
487                ),
488                AnnouncementMessage::Refs(RefsAnnouncement { rid, refs, timestamp }) => format!(
489                    "{verb} refs announcement of {node} for {rid} with {} remote(s) {prep} {remote} (t={timestamp})",
490                    refs.len()
491                ),
492                AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, timestamp }) => {
493                    format!(
494                        "{verb} inventory announcement of {node} with {} item(s) {prep} {remote} (t={timestamp})",
495                        inventory.len()
496                    )
497                }
498            },
499            Self::Info(Info::RefsAlreadySynced { rid,  .. }) => {
500                format!(
501                    "{verb} `refs-already-synced` info {prep} {remote} for {rid}"
502                )
503            },
504            Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
505            Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
506            Self::Subscribe(Subscribe { .. }) => {
507                format!("{verb} subscription filter {prep} {remote}")
508            }
509        };
510        log::log!(target: "service", level, "{msg}");
511    }
512}
513
514/// A ping message.
515#[derive(Debug, PartialEq, Eq, Clone)]
516pub struct Ping {
517    /// The requested length of the pong message.
518    pub ponglen: wire::Size,
519    /// Zero bytes (ignored).
520    pub zeroes: ZeroBytes,
521}
522
523impl Ping {
524    /// Maximum number of zero bytes in a ping message.
525    pub const MAX_PING_ZEROES: wire::Size = Message::MAX_SIZE // Message size without the type.
526        - mem::size_of::<wire::Size>() as wire::Size // Account for pong length.
527        - mem::size_of::<wire::Size>() as wire::Size; // Account for zeroes length prefix.
528
529    /// Maximum number of zero bytes in a pong message.
530    pub const MAX_PONG_ZEROES: wire::Size =
531        Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size; // Account for zeroes length
532                                                                        // prefix.
533
534    pub fn new(rng: &mut fastrand::Rng) -> Self {
535        let ponglen = rng.u16(0..Self::MAX_PONG_ZEROES);
536
537        Ping {
538            ponglen,
539            zeroes: ZeroBytes::new(rng.u16(0..Self::MAX_PING_ZEROES)),
540        }
541    }
542}
543
544impl From<Announcement> for Message {
545    fn from(ann: Announcement) -> Self {
546        Self::Announcement(ann)
547    }
548}
549
550impl From<Info> for Message {
551    fn from(info: Info) -> Self {
552        Self::Info(info)
553    }
554}
555
556impl fmt::Debug for Message {
557    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
558        match self {
559            Self::Subscribe(Subscribe { since, until, .. }) => {
560                write!(f, "Subscribe({since}..{until})")
561            }
562            Self::Announcement(Announcement { node, message, .. }) => {
563                write!(f, "Announcement({node}, {message:?})")
564            }
565            Self::Info(info) => {
566                write!(f, "Info({info:?})")
567            }
568            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
569            Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
570        }
571    }
572}
573
574/// Represents a vector of zeroes of a certain length.
575#[derive(Clone, Debug, PartialEq, Eq)]
576pub struct ZeroBytes(wire::Size);
577
578impl ZeroBytes {
579    pub fn new(size: wire::Size) -> Self {
580        ZeroBytes(size)
581    }
582
583    pub fn is_empty(&self) -> bool {
584        self.0 == 0
585    }
586
587    pub fn len(&self) -> usize {
588        self.0.into()
589    }
590}
591
592#[cfg(test)]
593#[allow(clippy::unwrap_used)]
594mod tests {
595    use std::str::FromStr;
596
597    use fastrand;
598    use qcheck_macros::quickcheck;
599    use radicle::git::raw;
600
601    use super::*;
602    use crate::prelude::*;
603    use crate::test::arbitrary;
604    use crate::wire::Encode;
605
606    #[test]
607    fn test_ref_remote_limit() {
608        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
609        let signer = Device::mock();
610        let at = raw::Oid::zero().into();
611
612        assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);
613
614        for _ in 0..refs.capacity() {
615            refs.push(RefsAt {
616                remote: *signer.public_key(),
617                at,
618            })
619            .unwrap();
620        }
621
622        let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
623            rid: arbitrary::gen(1),
624            refs,
625            timestamp: LocalTime::now().into(),
626        })
627        .signed(&Device::mock())
628        .into();
629
630        let mut buf: Vec<u8> = Vec::new();
631        assert!(msg.encode(&mut buf).is_ok());
632
633        let decoded = wire::deserialize(buf.as_slice());
634        assert!(decoded.is_ok());
635        assert_eq!(msg, decoded.unwrap());
636    }
637
638    #[test]
639    fn test_inventory_limit() {
640        let msg = Message::inventory(
641            InventoryAnnouncement {
642                inventory: arbitrary::vec(INVENTORY_LIMIT)
643                    .try_into()
644                    .expect("size within bounds limit"),
645                timestamp: LocalTime::now().into(),
646            },
647            &Device::mock(),
648        );
649        let mut buf: Vec<u8> = Vec::new();
650        assert!(
651            msg.encode(&mut buf).is_ok(),
652            "INVENTORY_LIMIT is a valid limit for encoding",
653        );
654
655        let decoded = wire::deserialize(buf.as_slice());
656        assert!(
657            decoded.is_ok(),
658            "INVENTORY_LIMIT is a valid limit for decoding"
659        );
660        assert_eq!(
661            msg,
662            decoded.unwrap(),
663            "encoding and decoding should be safe for message at INVENTORY_LIMIT",
664        );
665    }
666
667    #[quickcheck]
668    fn prop_refs_announcement_signing(rid: RepoId) {
669        let signer = Device::mock_rng(&mut fastrand::Rng::new());
670        let timestamp = Timestamp::EPOCH;
671        let at = raw::Oid::zero().into();
672        let refs = BoundedVec::collect_from(
673            &mut [RefsAt {
674                remote: *signer.public_key(),
675                at,
676            }]
677            .into_iter(),
678        );
679        let message = AnnouncementMessage::Refs(RefsAnnouncement {
680            rid,
681            refs,
682            timestamp,
683        });
684        let ann = message.signed(&signer);
685
686        assert!(ann.verify());
687    }
688
689    #[test]
690    fn test_node_announcement_validate() {
691        let ann = NodeAnnouncement {
692            version: 1,
693            features: node::Features::SEED,
694            timestamp: Timestamp::try_from(42491841u64).unwrap(),
695            alias: Alias::new("alice"),
696            addresses: BoundedVec::new(),
697            nonce: 0,
698            agent: UserAgent::from_str("/heartwood:1.0.0/").unwrap(),
699        };
700
701        assert_eq!(ann.work(), 1);
702        assert_eq!(ann.clone().solve(1).unwrap().work(), 1);
703        assert_eq!(ann.clone().solve(8).unwrap().work(), 10);
704        assert_eq!(ann.solve(14).unwrap().work(), 14);
705    }
706}