1use std::{fmt, io, mem};
2
3use nonempty::NonEmpty;
4use radicle::git;
5use radicle::storage::refs::RefsAt;
6
7use crate::crypto;
8use crate::identity::RepoId;
9use crate::node;
10use crate::node::{Address, Alias, UserAgent};
11use crate::prelude::BoundedVec;
12use crate::service::filter::Filter;
13use crate::service::{Link, NodeId, Timestamp};
14use crate::storage;
15use crate::wire;
16
17pub const ADDRESS_LIMIT: usize = 16;
19pub const REF_REMOTE_LIMIT: usize = 1024;
21pub const INVENTORY_LIMIT: usize = 2973;
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct Subscribe {
26 pub filter: Filter,
28 pub since: Timestamp,
30 pub until: Timestamp,
32}
33
34impl Subscribe {
35 pub fn all() -> Self {
36 Self {
37 filter: Filter::default(),
38 since: Timestamp::MIN,
39 until: Timestamp::MAX,
40 }
41 }
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct NodeAnnouncement {
47 pub version: u8,
49 pub features: node::Features,
51 pub timestamp: Timestamp,
53 pub alias: Alias,
55 pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
57 pub nonce: u64,
59 pub agent: UserAgent,
61}
62
63impl NodeAnnouncement {
64 pub fn work(&self) -> u32 {
77 let (n, r, p) = Announcement::POW_PARAMS;
78 let params = scrypt::Params::new(n, r, p, 32).expect("proof-of-work parameters are valid");
79 let mut output = [0u8; 32];
80
81 scrypt::scrypt(
82 wire::serialize(self).as_ref(),
83 Announcement::POW_SALT,
84 ¶ms,
85 &mut output,
86 )
87 .expect("proof-of-work output vector is a valid length");
88
89 if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|(_, &x)| x != 0) {
91 zero_bytes as u32 * 8 + non_zero.leading_zeros()
92 } else {
93 output.len() as u32 * 8
94 }
95 }
96
97 pub fn solve(mut self, target: u32) -> Option<Self> {
103 loop {
104 if let Some(nonce) = self.nonce.checked_add(1) {
105 self.nonce = nonce;
106
107 if self.work() >= target {
108 break;
109 }
110 } else {
111 return None;
112 }
113 }
114 Some(self)
115 }
116}
117
118impl wire::Encode for NodeAnnouncement {
119 fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
120 let mut n = 0;
121
122 n += self.version.encode(writer)?;
123 n += self.features.encode(writer)?;
124 n += self.timestamp.encode(writer)?;
125 n += self.alias.encode(writer)?;
126 n += self.addresses.encode(writer)?;
127 n += self.nonce.encode(writer)?;
128 n += self.agent.encode(writer)?;
129
130 Ok(n)
131 }
132}
133
134impl wire::Decode for NodeAnnouncement {
135 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
136 let version = u8::decode(reader)?;
137 let features = node::Features::decode(reader)?;
138 let timestamp = Timestamp::decode(reader)?;
139 let alias = wire::Decode::decode(reader)?;
140 let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(reader)?;
141 let nonce = u64::decode(reader)?;
142 let agent = match UserAgent::decode(reader) {
143 Ok(ua) => ua,
144 Err(e) if e.is_eof() => UserAgent::default(),
145 Err(e) => return Err(e),
146 };
147
148 Ok(Self {
149 version,
150 features,
151 timestamp,
152 alias,
153 addresses,
154 nonce,
155 agent,
156 })
157 }
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
162pub struct RefsAnnouncement {
163 pub rid: RepoId,
165 pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
167 pub timestamp: Timestamp,
169}
170
171#[derive(Default)]
173pub struct RefsStatus {
174 pub want: Vec<RefsAt>,
177 pub have: Vec<RefsAt>,
179}
180
181impl RefsStatus {
182 pub fn new<D: node::refs::Store>(
190 rid: RepoId,
191 refs: NonEmpty<RefsAt>,
192 db: &D,
193 ) -> Result<RefsStatus, storage::Error> {
194 let mut status = RefsStatus::default();
195 for theirs in refs.iter() {
196 status.insert(&rid, *theirs, db)?;
197 }
198 Ok(status)
199 }
200
201 fn insert<D: node::refs::Store>(
202 &mut self,
203 repo: &RepoId,
204 theirs: RefsAt,
205 db: &D,
206 ) -> Result<(), storage::Error> {
207 match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
208 Ok(Some((ours, _))) => {
209 if theirs.at != ours {
210 self.want.push(theirs);
211 } else {
212 self.have.push(theirs);
213 }
214 }
215 Ok(None) => {
216 self.want.push(theirs);
217 }
218 Err(e) => {
219 log::warn!(
220 target: "service",
221 "Error getting cached ref of {repo} for refs status: {e}"
222 );
223 }
224 }
225 Ok(())
226 }
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct InventoryAnnouncement {
233 pub inventory: BoundedVec<RepoId, INVENTORY_LIMIT>,
235 pub timestamp: Timestamp,
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
244pub enum Info {
245 RefsAlreadySynced { rid: RepoId, at: git::Oid },
248}
249
250#[derive(Clone, PartialEq, Eq)]
252pub enum AnnouncementMessage {
253 Inventory(InventoryAnnouncement),
255 Node(NodeAnnouncement),
257 Refs(RefsAnnouncement),
259}
260
261impl AnnouncementMessage {
262 pub fn signed<G: crypto::Signer>(self, signer: &G) -> Announcement {
264 let msg = wire::serialize(&self);
265 let signature = signer.sign(&msg);
266
267 Announcement {
268 node: *signer.public_key(),
269 message: self,
270 signature,
271 }
272 }
273
274 pub fn timestamp(&self) -> Timestamp {
275 match self {
276 Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
277 Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
278 Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
279 }
280 }
281
282 pub fn is_node_announcement(&self) -> bool {
283 matches!(self, Self::Node(_))
284 }
285}
286
287impl From<NodeAnnouncement> for AnnouncementMessage {
288 fn from(ann: NodeAnnouncement) -> Self {
289 Self::Node(ann)
290 }
291}
292
293impl From<InventoryAnnouncement> for AnnouncementMessage {
294 fn from(ann: InventoryAnnouncement) -> Self {
295 Self::Inventory(ann)
296 }
297}
298
299impl From<RefsAnnouncement> for AnnouncementMessage {
300 fn from(ann: RefsAnnouncement) -> Self {
301 Self::Refs(ann)
302 }
303}
304
305impl fmt::Debug for AnnouncementMessage {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 match self {
308 Self::Node(message) => write!(f, "Node({})", message.timestamp),
309 Self::Inventory(message) => {
310 write!(
311 f,
312 "Inventory([{}], {})",
313 message
314 .inventory
315 .iter()
316 .map(|i| i.to_string())
317 .collect::<Vec<String>>()
318 .join(", "),
319 message.timestamp
320 )
321 }
322 Self::Refs(message) => {
323 write!(
324 f,
325 "Refs({}, {}, {:?})",
326 message.rid, message.timestamp, message.refs
327 )
328 }
329 }
330 }
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct Announcement {
335 pub node: NodeId,
337 pub signature: crypto::Signature,
339 pub message: AnnouncementMessage,
341}
342
343impl Announcement {
344 #[cfg(debug_assertions)]
356 pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
357 #[cfg(not(debug_assertions))]
358 pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
359 pub const POW_SALT: &'static [u8] = &[b'r', b'a', b'd'];
361
362 pub fn verify(&self) -> bool {
364 let msg = wire::serialize(&self.message);
365 self.node.verify(msg, &self.signature).is_ok()
366 }
367
368 pub fn matches(&self, filter: &Filter) -> bool {
369 match &self.message {
370 AnnouncementMessage::Inventory(_) => true,
371 AnnouncementMessage::Node(_) => true,
372 AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
373 }
374 }
375
376 pub fn variant_eq(&self, other: &Self) -> bool {
378 std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
379 }
380
381 pub fn timestamp(&self) -> Timestamp {
383 self.message.timestamp()
384 }
385}
386
387#[derive(Clone, PartialEq, Eq)]
390pub enum Message {
391 Subscribe(Subscribe),
393
394 Announcement(Announcement),
397
398 Info(Info),
402
403 Ping(Ping),
408
409 Pong {
411 zeroes: ZeroBytes,
413 },
414}
415
416impl PartialOrd for Message {
417 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
418 Some(self.cmp(other))
419 }
420}
421
422impl Ord for Message {
423 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
424 let this = wire::serialize(self);
425 let other = wire::serialize(other);
426
427 this.cmp(&other)
428 }
429}
430
431impl Message {
432 pub fn announcement(
433 node: NodeId,
434 message: impl Into<AnnouncementMessage>,
435 signature: crypto::Signature,
436 ) -> Self {
437 Announcement {
438 node,
439 signature,
440 message: message.into(),
441 }
442 .into()
443 }
444
445 pub fn node<G: crypto::Signer>(message: NodeAnnouncement, signer: &G) -> Self {
446 AnnouncementMessage::from(message).signed(signer).into()
447 }
448
449 pub fn inventory<G: crypto::Signer>(message: InventoryAnnouncement, signer: &G) -> Self {
450 AnnouncementMessage::from(message).signed(signer).into()
451 }
452
453 pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
454 Self::Subscribe(Subscribe {
455 filter,
456 since,
457 until,
458 })
459 }
460
461 pub fn log(&self, level: log::Level, remote: &NodeId, link: Link) {
462 if !log::log_enabled!(level) {
463 return;
464 }
465 let (verb, prep) = if link.is_inbound() {
466 ("Received", "from")
467 } else {
468 ("Sending", "to")
469 };
470 let msg = match self {
471 Self::Announcement(Announcement { node, message, .. }) => match message {
472 AnnouncementMessage::Node(NodeAnnouncement { addresses, timestamp, .. }) => format!(
473 "{verb} node announcement of {node} with {} address(es) {prep} {remote} (t={timestamp})",
474 addresses.len()
475 ),
476 AnnouncementMessage::Refs(RefsAnnouncement { rid, refs, timestamp }) => format!(
477 "{verb} refs announcement of {node} for {rid} with {} remote(s) {prep} {remote} (t={timestamp})",
478 refs.len()
479 ),
480 AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, timestamp }) => {
481 format!(
482 "{verb} inventory announcement of {node} with {} item(s) {prep} {remote} (t={timestamp})",
483 inventory.len()
484 )
485 }
486 },
487 Self::Info(Info::RefsAlreadySynced { rid, .. }) => {
488 format!(
489 "{verb} `refs-already-synced` info {prep} {remote} for {rid}"
490 )
491 },
492 Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
493 Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
494 Self::Subscribe(Subscribe { .. }) => {
495 format!("{verb} subscription filter {prep} {remote}")
496 }
497 };
498 log::log!(target: "service", level, "{msg}");
499 }
500}
501
502#[derive(Debug, PartialEq, Eq, Clone)]
504pub struct Ping {
505 pub ponglen: wire::Size,
507 pub zeroes: ZeroBytes,
509}
510
511impl Ping {
512 pub const MAX_PING_ZEROES: wire::Size = Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size - mem::size_of::<wire::Size>() as wire::Size; pub const MAX_PONG_ZEROES: wire::Size =
519 Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size; pub fn new(rng: &mut fastrand::Rng) -> Self {
523 let ponglen = rng.u16(0..Self::MAX_PONG_ZEROES);
524
525 Ping {
526 ponglen,
527 zeroes: ZeroBytes::new(rng.u16(0..Self::MAX_PING_ZEROES)),
528 }
529 }
530}
531
532impl From<Announcement> for Message {
533 fn from(ann: Announcement) -> Self {
534 Self::Announcement(ann)
535 }
536}
537
538impl From<Info> for Message {
539 fn from(info: Info) -> Self {
540 Self::Info(info)
541 }
542}
543
544impl fmt::Debug for Message {
545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546 match self {
547 Self::Subscribe(Subscribe { since, until, .. }) => {
548 write!(f, "Subscribe({since}..{until})")
549 }
550 Self::Announcement(Announcement { node, message, .. }) => {
551 write!(f, "Announcement({node}, {message:?})")
552 }
553 Self::Info(info) => {
554 write!(f, "Info({info:?})")
555 }
556 Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
557 Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
558 }
559 }
560}
561
562#[derive(Clone, Debug, PartialEq, Eq)]
564pub struct ZeroBytes(wire::Size);
565
566impl ZeroBytes {
567 pub fn new(size: wire::Size) -> Self {
568 ZeroBytes(size)
569 }
570
571 pub fn is_empty(&self) -> bool {
572 self.0 == 0
573 }
574
575 pub fn len(&self) -> usize {
576 self.0.into()
577 }
578}
579
580#[cfg(test)]
581#[allow(clippy::unwrap_used)]
582mod tests {
583 use std::str::FromStr;
584
585 use fastrand;
586 use qcheck_macros::quickcheck;
587 use radicle::git::raw;
588
589 use super::*;
590 use crate::crypto::test::signer::MockSigner;
591 use crate::prelude::*;
592 use crate::test::arbitrary;
593 use crate::wire::Encode;
594
595 #[test]
596 fn test_ref_remote_limit() {
597 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
598 let signer = MockSigner::default();
599 let at = raw::Oid::zero().into();
600
601 assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);
602
603 for _ in 0..refs.capacity() {
604 refs.push(RefsAt {
605 remote: *signer.public_key(),
606 at,
607 })
608 .unwrap();
609 }
610
611 let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
612 rid: arbitrary::gen(1),
613 refs,
614 timestamp: LocalTime::now().into(),
615 })
616 .signed(&MockSigner::default())
617 .into();
618
619 let mut buf: Vec<u8> = Vec::new();
620 assert!(msg.encode(&mut buf).is_ok());
621
622 let decoded = wire::deserialize(buf.as_slice());
623 assert!(decoded.is_ok());
624 assert_eq!(msg, decoded.unwrap());
625 }
626
627 #[test]
628 fn test_inventory_limit() {
629 let msg = Message::inventory(
630 InventoryAnnouncement {
631 inventory: arbitrary::vec(INVENTORY_LIMIT)
632 .try_into()
633 .expect("size within bounds limit"),
634 timestamp: LocalTime::now().into(),
635 },
636 &MockSigner::default(),
637 );
638 let mut buf: Vec<u8> = Vec::new();
639 assert!(
640 msg.encode(&mut buf).is_ok(),
641 "INVENTORY_LIMIT is a valid limit for encoding",
642 );
643
644 let decoded = wire::deserialize(buf.as_slice());
645 assert!(
646 decoded.is_ok(),
647 "INVENTORY_LIMIT is a valid limit for decoding"
648 );
649 assert_eq!(
650 msg,
651 decoded.unwrap(),
652 "encoding and decoding should be safe for message at INVENTORY_LIMIT",
653 );
654 }
655
656 #[quickcheck]
657 fn prop_refs_announcement_signing(rid: RepoId) {
658 let signer = MockSigner::new(&mut fastrand::Rng::new());
659 let timestamp = Timestamp::EPOCH;
660 let at = raw::Oid::zero().into();
661 let refs = BoundedVec::collect_from(
662 &mut [RefsAt {
663 remote: *signer.public_key(),
664 at,
665 }]
666 .into_iter(),
667 );
668 let message = AnnouncementMessage::Refs(RefsAnnouncement {
669 rid,
670 refs,
671 timestamp,
672 });
673 let ann = message.signed(&signer);
674
675 assert!(ann.verify());
676 }
677
678 #[test]
679 fn test_node_announcement_validate() {
680 let ann = NodeAnnouncement {
681 version: 1,
682 features: node::Features::SEED,
683 timestamp: Timestamp::try_from(42491841u64).unwrap(),
684 alias: Alias::new("alice"),
685 addresses: BoundedVec::new(),
686 nonce: 0,
687 agent: UserAgent::from_str("/heartwood:1.0.0/").unwrap(),
688 };
689
690 assert_eq!(ann.work(), 1);
691 assert_eq!(ann.clone().solve(1).unwrap().work(), 1);
692 assert_eq!(ann.clone().solve(8).unwrap().work(), 10);
693 assert_eq!(ann.solve(14).unwrap().work(), 14);
694 }
695}