1use std::{fmt, io, mem};
2
3use nonempty::NonEmpty;
4use radicle::git;
5use radicle::node::device::Device;
6use radicle::storage::refs::RefsAt;
7
8use crate::crypto;
9use crate::identity::RepoId;
10use crate::node;
11use crate::node::{Address, Alias, UserAgent};
12use crate::prelude::BoundedVec;
13use crate::service::filter::Filter;
14use crate::service::{Link, NodeId, Timestamp};
15use crate::storage;
16use crate::wire;
17
18pub const ADDRESS_LIMIT: usize = 16;
20pub const REF_REMOTE_LIMIT: usize = 1024;
22pub const INVENTORY_LIMIT: usize = 2973;
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct Subscribe {
27 pub filter: Filter,
29 pub since: Timestamp,
31 pub until: Timestamp,
33}
34
35impl Subscribe {
36 pub fn all() -> Self {
37 Self {
38 filter: Filter::default(),
39 since: Timestamp::MIN,
40 until: Timestamp::MAX,
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct NodeAnnouncement {
48 pub version: u8,
50 pub features: node::Features,
52 pub timestamp: Timestamp,
54 pub alias: Alias,
56 pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
58 pub nonce: u64,
60 pub agent: UserAgent,
62}
63
64impl NodeAnnouncement {
65 pub fn work(&self) -> u32 {
78 let (n, r, p) = Announcement::POW_PARAMS;
79 let params = scrypt::Params::new(n, r, p, 32).expect("proof-of-work parameters are valid");
80 let mut output = [0u8; 32];
81
82 scrypt::scrypt(
83 wire::serialize(self).as_ref(),
84 Announcement::POW_SALT,
85 ¶ms,
86 &mut output,
87 )
88 .expect("proof-of-work output vector is a valid length");
89
90 if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|(_, &x)| x != 0) {
92 zero_bytes as u32 * 8 + non_zero.leading_zeros()
93 } else {
94 output.len() as u32 * 8
95 }
96 }
97
98 pub fn solve(mut self, target: u32) -> Option<Self> {
104 loop {
105 if let Some(nonce) = self.nonce.checked_add(1) {
106 self.nonce = nonce;
107
108 if self.work() >= target {
109 break;
110 }
111 } else {
112 return None;
113 }
114 }
115 Some(self)
116 }
117}
118
119impl wire::Encode for NodeAnnouncement {
120 fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
121 let mut n = 0;
122
123 n += self.version.encode(writer)?;
124 n += self.features.encode(writer)?;
125 n += self.timestamp.encode(writer)?;
126 n += self.alias.encode(writer)?;
127 n += self.addresses.encode(writer)?;
128 n += self.nonce.encode(writer)?;
129 n += self.agent.encode(writer)?;
130
131 Ok(n)
132 }
133}
134
135impl wire::Decode for NodeAnnouncement {
136 fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
137 let version = u8::decode(reader)?;
138 let features = node::Features::decode(reader)?;
139 let timestamp = Timestamp::decode(reader)?;
140 let alias = wire::Decode::decode(reader)?;
141 let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(reader)?;
142 let nonce = u64::decode(reader)?;
143 let agent = match UserAgent::decode(reader) {
144 Ok(ua) => ua,
145 Err(e) if e.is_eof() => UserAgent::default(),
146 Err(e) => return Err(e),
147 };
148
149 Ok(Self {
150 version,
151 features,
152 timestamp,
153 alias,
154 addresses,
155 nonce,
156 agent,
157 })
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct RefsAnnouncement {
164 pub rid: RepoId,
166 pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
168 pub timestamp: Timestamp,
170}
171
172#[derive(Default)]
174pub struct RefsStatus {
175 pub want: Vec<RefsAt>,
178 pub have: Vec<RefsAt>,
180}
181
182impl RefsStatus {
183 pub fn new<D: node::refs::Store>(
191 rid: RepoId,
192 refs: NonEmpty<RefsAt>,
193 db: &D,
194 ) -> Result<RefsStatus, storage::Error> {
195 let mut status = RefsStatus::default();
196 for theirs in refs.iter() {
197 status.insert(&rid, *theirs, db)?;
198 }
199 Ok(status)
200 }
201
202 fn insert<D: node::refs::Store>(
203 &mut self,
204 repo: &RepoId,
205 theirs: RefsAt,
206 db: &D,
207 ) -> Result<(), storage::Error> {
208 match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
209 Ok(Some((ours, _))) => {
210 if theirs.at != ours {
211 self.want.push(theirs);
212 } else {
213 self.have.push(theirs);
214 }
215 }
216 Ok(None) => {
217 self.want.push(theirs);
218 }
219 Err(e) => {
220 log::warn!(
221 target: "service",
222 "Error getting cached ref of {repo} for refs status: {e}"
223 );
224 }
225 }
226 Ok(())
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct InventoryAnnouncement {
234 pub inventory: BoundedVec<RepoId, INVENTORY_LIMIT>,
236 pub timestamp: Timestamp,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
245pub enum Info {
246 RefsAlreadySynced { rid: RepoId, at: git::Oid },
249}
250
251#[derive(Clone, PartialEq, Eq)]
253pub enum AnnouncementMessage {
254 Inventory(InventoryAnnouncement),
256 Node(NodeAnnouncement),
258 Refs(RefsAnnouncement),
260}
261
262impl AnnouncementMessage {
263 pub fn signed<G>(self, signer: &Device<G>) -> Announcement
265 where
266 G: crypto::signature::Signer<crypto::Signature>,
267 {
268 use crypto::signature::Signer as _;
269
270 let msg = wire::serialize(&self);
271 let signature = signer.sign(&msg);
272
273 Announcement {
274 node: *signer.public_key(),
275 message: self,
276 signature,
277 }
278 }
279
280 pub fn timestamp(&self) -> Timestamp {
281 match self {
282 Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
283 Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
284 Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
285 }
286 }
287
288 pub fn is_node_announcement(&self) -> bool {
289 matches!(self, Self::Node(_))
290 }
291}
292
293impl From<NodeAnnouncement> for AnnouncementMessage {
294 fn from(ann: NodeAnnouncement) -> Self {
295 Self::Node(ann)
296 }
297}
298
299impl From<InventoryAnnouncement> for AnnouncementMessage {
300 fn from(ann: InventoryAnnouncement) -> Self {
301 Self::Inventory(ann)
302 }
303}
304
305impl From<RefsAnnouncement> for AnnouncementMessage {
306 fn from(ann: RefsAnnouncement) -> Self {
307 Self::Refs(ann)
308 }
309}
310
311impl fmt::Debug for AnnouncementMessage {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 match self {
314 Self::Node(message) => write!(f, "Node({})", message.timestamp),
315 Self::Inventory(message) => {
316 write!(
317 f,
318 "Inventory([{}], {})",
319 message
320 .inventory
321 .iter()
322 .map(|i| i.to_string())
323 .collect::<Vec<String>>()
324 .join(", "),
325 message.timestamp
326 )
327 }
328 Self::Refs(message) => {
329 write!(
330 f,
331 "Refs({}, {}, {:?})",
332 message.rid, message.timestamp, message.refs
333 )
334 }
335 }
336 }
337}
338
339#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct Announcement {
341 pub node: NodeId,
343 pub signature: crypto::Signature,
345 pub message: AnnouncementMessage,
347}
348
349impl Announcement {
350 #[cfg(debug_assertions)]
362 pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
363 #[cfg(not(debug_assertions))]
364 pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
365 pub const POW_SALT: &'static [u8] = &[b'r', b'a', b'd'];
367
368 pub fn verify(&self) -> bool {
370 let msg = wire::serialize(&self.message);
371 self.node.verify(msg, &self.signature).is_ok()
372 }
373
374 pub fn matches(&self, filter: &Filter) -> bool {
375 match &self.message {
376 AnnouncementMessage::Inventory(_) => true,
377 AnnouncementMessage::Node(_) => true,
378 AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
379 }
380 }
381
382 pub fn variant_eq(&self, other: &Self) -> bool {
384 std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
385 }
386
387 pub fn timestamp(&self) -> Timestamp {
389 self.message.timestamp()
390 }
391}
392
393#[derive(Clone, PartialEq, Eq)]
396pub enum Message {
397 Subscribe(Subscribe),
399
400 Announcement(Announcement),
403
404 Info(Info),
408
409 Ping(Ping),
414
415 Pong {
417 zeroes: ZeroBytes,
419 },
420}
421
422impl PartialOrd for Message {
423 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
424 Some(self.cmp(other))
425 }
426}
427
428impl Ord for Message {
429 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
430 let this = wire::serialize(self);
431 let other = wire::serialize(other);
432
433 this.cmp(&other)
434 }
435}
436
437impl Message {
438 pub fn announcement(
439 node: NodeId,
440 message: impl Into<AnnouncementMessage>,
441 signature: crypto::Signature,
442 ) -> Self {
443 Announcement {
444 node,
445 signature,
446 message: message.into(),
447 }
448 .into()
449 }
450
451 pub fn node<G: crypto::signature::Signer<crypto::Signature>>(
452 message: NodeAnnouncement,
453 signer: &Device<G>,
454 ) -> Self {
455 AnnouncementMessage::from(message).signed(signer).into()
456 }
457
458 pub fn inventory<G: crypto::signature::Signer<crypto::Signature>>(
459 message: InventoryAnnouncement,
460 signer: &Device<G>,
461 ) -> Self {
462 AnnouncementMessage::from(message).signed(signer).into()
463 }
464
465 pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
466 Self::Subscribe(Subscribe {
467 filter,
468 since,
469 until,
470 })
471 }
472
473 pub fn log(&self, level: log::Level, remote: &NodeId, link: Link) {
474 if !log::log_enabled!(level) {
475 return;
476 }
477 let (verb, prep) = if link.is_inbound() {
478 ("Received", "from")
479 } else {
480 ("Sending", "to")
481 };
482 let msg = match self {
483 Self::Announcement(Announcement { node, message, .. }) => match message {
484 AnnouncementMessage::Node(NodeAnnouncement { addresses, timestamp, .. }) => format!(
485 "{verb} node announcement of {node} with {} address(es) {prep} {remote} (t={timestamp})",
486 addresses.len()
487 ),
488 AnnouncementMessage::Refs(RefsAnnouncement { rid, refs, timestamp }) => format!(
489 "{verb} refs announcement of {node} for {rid} with {} remote(s) {prep} {remote} (t={timestamp})",
490 refs.len()
491 ),
492 AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, timestamp }) => {
493 format!(
494 "{verb} inventory announcement of {node} with {} item(s) {prep} {remote} (t={timestamp})",
495 inventory.len()
496 )
497 }
498 },
499 Self::Info(Info::RefsAlreadySynced { rid, .. }) => {
500 format!(
501 "{verb} `refs-already-synced` info {prep} {remote} for {rid}"
502 )
503 },
504 Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
505 Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
506 Self::Subscribe(Subscribe { .. }) => {
507 format!("{verb} subscription filter {prep} {remote}")
508 }
509 };
510 log::log!(target: "service", level, "{msg}");
511 }
512}
513
514#[derive(Debug, PartialEq, Eq, Clone)]
516pub struct Ping {
517 pub ponglen: wire::Size,
519 pub zeroes: ZeroBytes,
521}
522
523impl Ping {
524 pub const MAX_PING_ZEROES: wire::Size = Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size - mem::size_of::<wire::Size>() as wire::Size; pub const MAX_PONG_ZEROES: wire::Size =
531 Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size; pub fn new(rng: &mut fastrand::Rng) -> Self {
535 let ponglen = rng.u16(0..Self::MAX_PONG_ZEROES);
536
537 Ping {
538 ponglen,
539 zeroes: ZeroBytes::new(rng.u16(0..Self::MAX_PING_ZEROES)),
540 }
541 }
542}
543
544impl From<Announcement> for Message {
545 fn from(ann: Announcement) -> Self {
546 Self::Announcement(ann)
547 }
548}
549
550impl From<Info> for Message {
551 fn from(info: Info) -> Self {
552 Self::Info(info)
553 }
554}
555
556impl fmt::Debug for Message {
557 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
558 match self {
559 Self::Subscribe(Subscribe { since, until, .. }) => {
560 write!(f, "Subscribe({since}..{until})")
561 }
562 Self::Announcement(Announcement { node, message, .. }) => {
563 write!(f, "Announcement({node}, {message:?})")
564 }
565 Self::Info(info) => {
566 write!(f, "Info({info:?})")
567 }
568 Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
569 Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
570 }
571 }
572}
573
574#[derive(Clone, Debug, PartialEq, Eq)]
576pub struct ZeroBytes(wire::Size);
577
578impl ZeroBytes {
579 pub fn new(size: wire::Size) -> Self {
580 ZeroBytes(size)
581 }
582
583 pub fn is_empty(&self) -> bool {
584 self.0 == 0
585 }
586
587 pub fn len(&self) -> usize {
588 self.0.into()
589 }
590}
591
592#[cfg(test)]
593#[allow(clippy::unwrap_used)]
594mod tests {
595 use std::str::FromStr;
596
597 use fastrand;
598 use qcheck_macros::quickcheck;
599 use radicle::git::raw;
600
601 use super::*;
602 use crate::prelude::*;
603 use crate::test::arbitrary;
604 use crate::wire::Encode;
605
606 #[test]
607 fn test_ref_remote_limit() {
608 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
609 let signer = Device::mock();
610 let at = raw::Oid::zero().into();
611
612 assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);
613
614 for _ in 0..refs.capacity() {
615 refs.push(RefsAt {
616 remote: *signer.public_key(),
617 at,
618 })
619 .unwrap();
620 }
621
622 let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
623 rid: arbitrary::gen(1),
624 refs,
625 timestamp: LocalTime::now().into(),
626 })
627 .signed(&Device::mock())
628 .into();
629
630 let mut buf: Vec<u8> = Vec::new();
631 assert!(msg.encode(&mut buf).is_ok());
632
633 let decoded = wire::deserialize(buf.as_slice());
634 assert!(decoded.is_ok());
635 assert_eq!(msg, decoded.unwrap());
636 }
637
638 #[test]
639 fn test_inventory_limit() {
640 let msg = Message::inventory(
641 InventoryAnnouncement {
642 inventory: arbitrary::vec(INVENTORY_LIMIT)
643 .try_into()
644 .expect("size within bounds limit"),
645 timestamp: LocalTime::now().into(),
646 },
647 &Device::mock(),
648 );
649 let mut buf: Vec<u8> = Vec::new();
650 assert!(
651 msg.encode(&mut buf).is_ok(),
652 "INVENTORY_LIMIT is a valid limit for encoding",
653 );
654
655 let decoded = wire::deserialize(buf.as_slice());
656 assert!(
657 decoded.is_ok(),
658 "INVENTORY_LIMIT is a valid limit for decoding"
659 );
660 assert_eq!(
661 msg,
662 decoded.unwrap(),
663 "encoding and decoding should be safe for message at INVENTORY_LIMIT",
664 );
665 }
666
667 #[quickcheck]
668 fn prop_refs_announcement_signing(rid: RepoId) {
669 let signer = Device::mock_rng(&mut fastrand::Rng::new());
670 let timestamp = Timestamp::EPOCH;
671 let at = raw::Oid::zero().into();
672 let refs = BoundedVec::collect_from(
673 &mut [RefsAt {
674 remote: *signer.public_key(),
675 at,
676 }]
677 .into_iter(),
678 );
679 let message = AnnouncementMessage::Refs(RefsAnnouncement {
680 rid,
681 refs,
682 timestamp,
683 });
684 let ann = message.signed(&signer);
685
686 assert!(ann.verify());
687 }
688
689 #[test]
690 fn test_node_announcement_validate() {
691 let ann = NodeAnnouncement {
692 version: 1,
693 features: node::Features::SEED,
694 timestamp: Timestamp::try_from(42491841u64).unwrap(),
695 alias: Alias::new("alice"),
696 addresses: BoundedVec::new(),
697 nonce: 0,
698 agent: UserAgent::from_str("/heartwood:1.0.0/").unwrap(),
699 };
700
701 assert_eq!(ann.work(), 1);
702 assert_eq!(ann.clone().solve(1).unwrap().work(), 1);
703 assert_eq!(ann.clone().solve(8).unwrap().work(), 10);
704 assert_eq!(ann.solve(14).unwrap().work(), 14);
705 }
706}