radicle_node/service/
message.rs

1use std::{fmt, io, mem};
2
3use nonempty::NonEmpty;
4use radicle::git;
5use radicle::storage::refs::RefsAt;
6
7use crate::crypto;
8use crate::identity::RepoId;
9use crate::node;
10use crate::node::{Address, Alias, UserAgent};
11use crate::prelude::BoundedVec;
12use crate::service::filter::Filter;
13use crate::service::{Link, NodeId, Timestamp};
14use crate::storage;
15use crate::wire;
16
17/// Maximum number of addresses which can be announced to other nodes.
18pub const ADDRESS_LIMIT: usize = 16;
19/// Maximum number of repository remotes that can be included in a [`RefsAnnouncement`] message.
20pub const REF_REMOTE_LIMIT: usize = 1024;
21/// Maximum number of inventory which can be announced to other nodes.
22pub const INVENTORY_LIMIT: usize = 2973;
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct Subscribe {
26    /// Subscribe to events matching this filter.
27    pub filter: Filter,
28    /// Request messages since this time.
29    pub since: Timestamp,
30    /// Request messages until this time.
31    pub until: Timestamp,
32}
33
34impl Subscribe {
35    pub fn all() -> Self {
36        Self {
37            filter: Filter::default(),
38            since: Timestamp::MIN,
39            until: Timestamp::MAX,
40        }
41    }
42}
43
44/// Node announcing itself to the network.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct NodeAnnouncement {
47    /// Supported protocol version.
48    pub version: u8,
49    /// Advertized features.
50    pub features: node::Features,
51    /// Monotonic timestamp.
52    pub timestamp: Timestamp,
53    /// Non-unique alias.
54    pub alias: Alias,
55    /// Announced addresses.
56    pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
57    /// Nonce used for announcement proof-of-work.
58    pub nonce: u64,
59    /// User-agent string.
60    pub agent: UserAgent,
61}
62
63impl NodeAnnouncement {
64    /// Calculate the amount of work that went into creating this announcement.
65    ///
66    /// Proof-of-work uses the [`scrypt`] algorithm with the parameters in
67    /// [`Announcement::POW_PARAMS`]. The "work" is calculated by counting the number of leading
68    /// zero bits after running `scrypt` on a serialized [`NodeAnnouncement`] using
69    /// [`wire::serialize`].
70    ///
71    /// In other words, `work = leading-zeros(scrypt(serialize(announcement)))`.
72    ///
73    /// Higher numbers mean higher difficulty. For each increase in work, difficulty is doubled.
74    /// For instance, an output of `7` is *four* times more work than an output of `5`.
75    ///
76    pub fn work(&self) -> u32 {
77        let (n, r, p) = Announcement::POW_PARAMS;
78        let params = scrypt::Params::new(n, r, p, 32).expect("proof-of-work parameters are valid");
79        let mut output = [0u8; 32];
80
81        scrypt::scrypt(
82            wire::serialize(self).as_ref(),
83            Announcement::POW_SALT,
84            &params,
85            &mut output,
86        )
87        .expect("proof-of-work output vector is a valid length");
88
89        // Calculate the number of leading zero bits in the output vector.
90        if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|(_, &x)| x != 0) {
91            zero_bytes as u32 * 8 + non_zero.leading_zeros()
92        } else {
93            output.len() as u32 * 8
94        }
95    }
96
97    /// Solve the proof-of-work of a node announcement for the given target, by iterating through
98    /// different nonces.
99    ///
100    /// If the given difficulty target is too high, there may not be a result. In that case, `None`
101    /// is returned.
102    pub fn solve(mut self, target: u32) -> Option<Self> {
103        loop {
104            if let Some(nonce) = self.nonce.checked_add(1) {
105                self.nonce = nonce;
106
107                if self.work() >= target {
108                    break;
109                }
110            } else {
111                return None;
112            }
113        }
114        Some(self)
115    }
116}
117
118impl wire::Encode for NodeAnnouncement {
119    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
120        let mut n = 0;
121
122        n += self.version.encode(writer)?;
123        n += self.features.encode(writer)?;
124        n += self.timestamp.encode(writer)?;
125        n += self.alias.encode(writer)?;
126        n += self.addresses.encode(writer)?;
127        n += self.nonce.encode(writer)?;
128        n += self.agent.encode(writer)?;
129
130        Ok(n)
131    }
132}
133
134impl wire::Decode for NodeAnnouncement {
135    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
136        let version = u8::decode(reader)?;
137        let features = node::Features::decode(reader)?;
138        let timestamp = Timestamp::decode(reader)?;
139        let alias = wire::Decode::decode(reader)?;
140        let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(reader)?;
141        let nonce = u64::decode(reader)?;
142        let agent = match UserAgent::decode(reader) {
143            Ok(ua) => ua,
144            Err(e) if e.is_eof() => UserAgent::default(),
145            Err(e) => return Err(e),
146        };
147
148        Ok(Self {
149            version,
150            features,
151            timestamp,
152            alias,
153            addresses,
154            nonce,
155            agent,
156        })
157    }
158}
159
160/// Node announcing project refs being created or updated.
161#[derive(Debug, Clone, PartialEq, Eq)]
162pub struct RefsAnnouncement {
163    /// Repository identifier.
164    pub rid: RepoId,
165    /// Updated `rad/sigrefs`.
166    pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
167    /// Time of announcement.
168    pub timestamp: Timestamp,
169}
170
171/// Track the status of `RefsAt` within a given repository.
172#[derive(Default)]
173pub struct RefsStatus {
174    /// The `rad/sigrefs` was missing or it's ahead of the local
175    /// `rad/sigrefs`. We want it.
176    pub want: Vec<RefsAt>,
177    /// The `rad/sigrefs` has been seen before. We already have it.
178    pub have: Vec<RefsAt>,
179}
180
181impl RefsStatus {
182    /// Get the set of `want` and `have` `RefsAt`'s for the given
183    /// announcement.
184    ///
185    /// Nb. We use the refs database as a cache for quick lookups. This does *not* check
186    /// for ancestry matches, since we don't cache the whole history (only the tips).
187    /// This, however, is not a problem because the signed refs branch is fast-forward only,
188    /// and old refs announcements will be discarded due to their lower timestamps.
189    pub fn new<D: node::refs::Store>(
190        rid: RepoId,
191        refs: NonEmpty<RefsAt>,
192        db: &D,
193    ) -> Result<RefsStatus, storage::Error> {
194        let mut status = RefsStatus::default();
195        for theirs in refs.iter() {
196            status.insert(&rid, *theirs, db)?;
197        }
198        Ok(status)
199    }
200
201    fn insert<D: node::refs::Store>(
202        &mut self,
203        repo: &RepoId,
204        theirs: RefsAt,
205        db: &D,
206    ) -> Result<(), storage::Error> {
207        match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
208            Ok(Some((ours, _))) => {
209                if theirs.at != ours {
210                    self.want.push(theirs);
211                } else {
212                    self.have.push(theirs);
213                }
214            }
215            Ok(None) => {
216                self.want.push(theirs);
217            }
218            Err(e) => {
219                log::warn!(
220                    target: "service",
221                    "Error getting cached ref of {repo} for refs status: {e}"
222                );
223            }
224        }
225        Ok(())
226    }
227}
228
229/// Node announcing its inventory to the network.
230/// This should be the whole inventory every time.
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct InventoryAnnouncement {
233    /// Node inventory.
234    pub inventory: BoundedVec<RepoId, INVENTORY_LIMIT>,
235    /// Time of announcement.
236    pub timestamp: Timestamp,
237}
238
239/// Node announcing information to a connected peer.
240///
241/// This should not be relayed and should be used to send an
242/// informational message a peer.
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub enum Info {
245    /// Tell a node that sent a refs announcement that it was already synced at the given `Oid`,
246    /// for this particular `rid`.
247    RefsAlreadySynced { rid: RepoId, at: git::Oid },
248}
249
250/// Announcement messages are messages that are relayed between peers.
251#[derive(Clone, PartialEq, Eq)]
252pub enum AnnouncementMessage {
253    /// Inventory announcement.
254    Inventory(InventoryAnnouncement),
255    /// Node announcement.
256    Node(NodeAnnouncement),
257    /// Refs announcement.
258    Refs(RefsAnnouncement),
259}
260
261impl AnnouncementMessage {
262    /// Sign this announcement message.
263    pub fn signed<G: crypto::Signer>(self, signer: &G) -> Announcement {
264        let msg = wire::serialize(&self);
265        let signature = signer.sign(&msg);
266
267        Announcement {
268            node: *signer.public_key(),
269            message: self,
270            signature,
271        }
272    }
273
274    pub fn timestamp(&self) -> Timestamp {
275        match self {
276            Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
277            Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
278            Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
279        }
280    }
281
282    pub fn is_node_announcement(&self) -> bool {
283        matches!(self, Self::Node(_))
284    }
285}
286
287impl From<NodeAnnouncement> for AnnouncementMessage {
288    fn from(ann: NodeAnnouncement) -> Self {
289        Self::Node(ann)
290    }
291}
292
293impl From<InventoryAnnouncement> for AnnouncementMessage {
294    fn from(ann: InventoryAnnouncement) -> Self {
295        Self::Inventory(ann)
296    }
297}
298
299impl From<RefsAnnouncement> for AnnouncementMessage {
300    fn from(ann: RefsAnnouncement) -> Self {
301        Self::Refs(ann)
302    }
303}
304
305impl fmt::Debug for AnnouncementMessage {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        match self {
308            Self::Node(message) => write!(f, "Node({})", message.timestamp),
309            Self::Inventory(message) => {
310                write!(
311                    f,
312                    "Inventory([{}], {})",
313                    message
314                        .inventory
315                        .iter()
316                        .map(|i| i.to_string())
317                        .collect::<Vec<String>>()
318                        .join(", "),
319                    message.timestamp
320                )
321            }
322            Self::Refs(message) => {
323                write!(
324                    f,
325                    "Refs({}, {}, {:?})",
326                    message.rid, message.timestamp, message.refs
327                )
328            }
329        }
330    }
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct Announcement {
335    /// Node identifier.
336    pub node: NodeId,
337    /// Signature over the announcement.
338    pub signature: crypto::Signature,
339    /// Unsigned node announcement.
340    pub message: AnnouncementMessage,
341}
342
343impl Announcement {
344    /// Proof-of-work parameters for announcements.
345    ///
346    /// These parameters are fed into `scrypt`.
347    /// They represent the `log2(N)`, `r`, `p` parameters, respectively.
348    ///
349    /// * log2(N) – iterations count (affects memory and CPU usage), e.g. 15
350    /// * r – block size (affects memory and CPU usage), e.g. 8
351    /// * p – parallelism factor (threads to run in parallel - affects the memory, CPU usage), usually 1
352    ///
353    /// `15, 8, 1` are usually the recommended parameters.
354    ///
355    #[cfg(debug_assertions)]
356    pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
357    #[cfg(not(debug_assertions))]
358    pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
359    /// Salt used for generating PoW.
360    pub const POW_SALT: &'static [u8] = &[b'r', b'a', b'd'];
361
362    /// Verify this announcement's signature.
363    pub fn verify(&self) -> bool {
364        let msg = wire::serialize(&self.message);
365        self.node.verify(msg, &self.signature).is_ok()
366    }
367
368    pub fn matches(&self, filter: &Filter) -> bool {
369        match &self.message {
370            AnnouncementMessage::Inventory(_) => true,
371            AnnouncementMessage::Node(_) => true,
372            AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
373        }
374    }
375
376    /// Check whether this announcement is of the same variant as another.
377    pub fn variant_eq(&self, other: &Self) -> bool {
378        std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
379    }
380
381    /// Get the announcement timestamp.
382    pub fn timestamp(&self) -> Timestamp {
383        self.message.timestamp()
384    }
385}
386
387/// Message payload.
388/// These are the messages peers send to each other.
389#[derive(Clone, PartialEq, Eq)]
390pub enum Message {
391    /// Subscribe to gossip messages matching the filter and time range.
392    Subscribe(Subscribe),
393
394    /// Gossip announcement. These messages are relayed to peers, and filtered
395    /// using [`Message::Subscribe`].
396    Announcement(Announcement),
397
398    /// Informational message. These messages are sent between peers for information
399    /// and do not need to be acted upon. They can be safely ignored, though handling
400    /// them can be useful for the user.
401    Info(Info),
402
403    /// Ask a connected peer for a Pong.
404    ///
405    /// Used to check if the remote peer is responsive, or a side-effect free way to keep a
406    /// connection alive.
407    Ping(Ping),
408
409    /// Response to `Ping` message.
410    Pong {
411        /// The pong payload.
412        zeroes: ZeroBytes,
413    },
414}
415
416impl PartialOrd for Message {
417    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
418        Some(self.cmp(other))
419    }
420}
421
422impl Ord for Message {
423    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
424        let this = wire::serialize(self);
425        let other = wire::serialize(other);
426
427        this.cmp(&other)
428    }
429}
430
431impl Message {
432    pub fn announcement(
433        node: NodeId,
434        message: impl Into<AnnouncementMessage>,
435        signature: crypto::Signature,
436    ) -> Self {
437        Announcement {
438            node,
439            signature,
440            message: message.into(),
441        }
442        .into()
443    }
444
445    pub fn node<G: crypto::Signer>(message: NodeAnnouncement, signer: &G) -> Self {
446        AnnouncementMessage::from(message).signed(signer).into()
447    }
448
449    pub fn inventory<G: crypto::Signer>(message: InventoryAnnouncement, signer: &G) -> Self {
450        AnnouncementMessage::from(message).signed(signer).into()
451    }
452
453    pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
454        Self::Subscribe(Subscribe {
455            filter,
456            since,
457            until,
458        })
459    }
460
461    pub fn log(&self, level: log::Level, remote: &NodeId, link: Link) {
462        if !log::log_enabled!(level) {
463            return;
464        }
465        let (verb, prep) = if link.is_inbound() {
466            ("Received", "from")
467        } else {
468            ("Sending", "to")
469        };
470        let msg = match self {
471            Self::Announcement(Announcement { node, message, .. }) => match message {
472                AnnouncementMessage::Node(NodeAnnouncement { addresses, timestamp, .. }) => format!(
473                    "{verb} node announcement of {node} with {} address(es) {prep} {remote} (t={timestamp})",
474                    addresses.len()
475                ),
476                AnnouncementMessage::Refs(RefsAnnouncement { rid, refs, timestamp }) => format!(
477                    "{verb} refs announcement of {node} for {rid} with {} remote(s) {prep} {remote} (t={timestamp})",
478                    refs.len()
479                ),
480                AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, timestamp }) => {
481                    format!(
482                        "{verb} inventory announcement of {node} with {} item(s) {prep} {remote} (t={timestamp})",
483                        inventory.len()
484                    )
485                }
486            },
487            Self::Info(Info::RefsAlreadySynced { rid,  .. }) => {
488                format!(
489                    "{verb} `refs-already-synced` info {prep} {remote} for {rid}"
490                )
491            },
492            Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
493            Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
494            Self::Subscribe(Subscribe { .. }) => {
495                format!("{verb} subscription filter {prep} {remote}")
496            }
497        };
498        log::log!(target: "service", level, "{msg}");
499    }
500}
501
502/// A ping message.
503#[derive(Debug, PartialEq, Eq, Clone)]
504pub struct Ping {
505    /// The requested length of the pong message.
506    pub ponglen: wire::Size,
507    /// Zero bytes (ignored).
508    pub zeroes: ZeroBytes,
509}
510
511impl Ping {
512    /// Maximum number of zero bytes in a ping message.
513    pub const MAX_PING_ZEROES: wire::Size = Message::MAX_SIZE // Message size without the type.
514        - mem::size_of::<wire::Size>() as wire::Size // Account for pong length.
515        - mem::size_of::<wire::Size>() as wire::Size; // Account for zeroes length prefix.
516
517    /// Maximum number of zero bytes in a pong message.
518    pub const MAX_PONG_ZEROES: wire::Size =
519        Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size; // Account for zeroes length
520                                                                        // prefix.
521
522    pub fn new(rng: &mut fastrand::Rng) -> Self {
523        let ponglen = rng.u16(0..Self::MAX_PONG_ZEROES);
524
525        Ping {
526            ponglen,
527            zeroes: ZeroBytes::new(rng.u16(0..Self::MAX_PING_ZEROES)),
528        }
529    }
530}
531
532impl From<Announcement> for Message {
533    fn from(ann: Announcement) -> Self {
534        Self::Announcement(ann)
535    }
536}
537
538impl From<Info> for Message {
539    fn from(info: Info) -> Self {
540        Self::Info(info)
541    }
542}
543
544impl fmt::Debug for Message {
545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546        match self {
547            Self::Subscribe(Subscribe { since, until, .. }) => {
548                write!(f, "Subscribe({since}..{until})")
549            }
550            Self::Announcement(Announcement { node, message, .. }) => {
551                write!(f, "Announcement({node}, {message:?})")
552            }
553            Self::Info(info) => {
554                write!(f, "Info({info:?})")
555            }
556            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
557            Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
558        }
559    }
560}
561
562/// Represents a vector of zeroes of a certain length.
563#[derive(Clone, Debug, PartialEq, Eq)]
564pub struct ZeroBytes(wire::Size);
565
566impl ZeroBytes {
567    pub fn new(size: wire::Size) -> Self {
568        ZeroBytes(size)
569    }
570
571    pub fn is_empty(&self) -> bool {
572        self.0 == 0
573    }
574
575    pub fn len(&self) -> usize {
576        self.0.into()
577    }
578}
579
580#[cfg(test)]
581#[allow(clippy::unwrap_used)]
582mod tests {
583    use std::str::FromStr;
584
585    use fastrand;
586    use qcheck_macros::quickcheck;
587    use radicle::git::raw;
588
589    use super::*;
590    use crate::crypto::test::signer::MockSigner;
591    use crate::prelude::*;
592    use crate::test::arbitrary;
593    use crate::wire::Encode;
594
595    #[test]
596    fn test_ref_remote_limit() {
597        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
598        let signer = MockSigner::default();
599        let at = raw::Oid::zero().into();
600
601        assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);
602
603        for _ in 0..refs.capacity() {
604            refs.push(RefsAt {
605                remote: *signer.public_key(),
606                at,
607            })
608            .unwrap();
609        }
610
611        let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
612            rid: arbitrary::gen(1),
613            refs,
614            timestamp: LocalTime::now().into(),
615        })
616        .signed(&MockSigner::default())
617        .into();
618
619        let mut buf: Vec<u8> = Vec::new();
620        assert!(msg.encode(&mut buf).is_ok());
621
622        let decoded = wire::deserialize(buf.as_slice());
623        assert!(decoded.is_ok());
624        assert_eq!(msg, decoded.unwrap());
625    }
626
627    #[test]
628    fn test_inventory_limit() {
629        let msg = Message::inventory(
630            InventoryAnnouncement {
631                inventory: arbitrary::vec(INVENTORY_LIMIT)
632                    .try_into()
633                    .expect("size within bounds limit"),
634                timestamp: LocalTime::now().into(),
635            },
636            &MockSigner::default(),
637        );
638        let mut buf: Vec<u8> = Vec::new();
639        assert!(
640            msg.encode(&mut buf).is_ok(),
641            "INVENTORY_LIMIT is a valid limit for encoding",
642        );
643
644        let decoded = wire::deserialize(buf.as_slice());
645        assert!(
646            decoded.is_ok(),
647            "INVENTORY_LIMIT is a valid limit for decoding"
648        );
649        assert_eq!(
650            msg,
651            decoded.unwrap(),
652            "encoding and decoding should be safe for message at INVENTORY_LIMIT",
653        );
654    }
655
656    #[quickcheck]
657    fn prop_refs_announcement_signing(rid: RepoId) {
658        let signer = MockSigner::new(&mut fastrand::Rng::new());
659        let timestamp = Timestamp::EPOCH;
660        let at = raw::Oid::zero().into();
661        let refs = BoundedVec::collect_from(
662            &mut [RefsAt {
663                remote: *signer.public_key(),
664                at,
665            }]
666            .into_iter(),
667        );
668        let message = AnnouncementMessage::Refs(RefsAnnouncement {
669            rid,
670            refs,
671            timestamp,
672        });
673        let ann = message.signed(&signer);
674
675        assert!(ann.verify());
676    }
677
678    #[test]
679    fn test_node_announcement_validate() {
680        let ann = NodeAnnouncement {
681            version: 1,
682            features: node::Features::SEED,
683            timestamp: Timestamp::try_from(42491841u64).unwrap(),
684            alias: Alias::new("alice"),
685            addresses: BoundedVec::new(),
686            nonce: 0,
687            agent: UserAgent::from_str("/heartwood:1.0.0/").unwrap(),
688        };
689
690        assert_eq!(ann.work(), 1);
691        assert_eq!(ann.clone().solve(1).unwrap().work(), 1);
692        assert_eq!(ann.clone().solve(8).unwrap().work(), 10);
693        assert_eq!(ann.solve(14).unwrap().work(), 14);
694    }
695}