Skip to main content

radicle/
node.rs

1#![allow(clippy::type_complexity)]
2#![allow(clippy::collapsible_if)]
3mod features;
4
5pub mod address;
6pub mod command;
7pub mod config;
8pub mod db;
9pub mod device;
10pub mod events;
11pub mod notifications;
12pub mod policy;
13pub mod refs;
14pub mod routing;
15pub mod seed;
16pub mod sync;
17pub mod timestamp;
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
20use std::fmt::Display;
21use std::io::{BufRead, BufReader};
22use std::marker::PhantomData;
23use std::net::IpAddr;
24use std::net::Ipv6Addr;
25use std::ops::{ControlFlow, Deref};
26use std::path::{Path, PathBuf};
27use std::str::FromStr;
28use std::{fmt, io, net, thread, time};
29
30#[cfg(unix)]
31use std::os::unix::net::UnixStream;
32#[cfg(windows)]
33use uds_windows::UnixStream;
34
35use amplify::WrapperMut;
36use cyphernet::addr::{AddrParseError, NetAddr};
37use localtime::{LocalDuration, LocalTime};
38use serde::de::DeserializeOwned;
39use serde::{Deserialize, Serialize};
40use serde_json as json;
41
42use crate::crypto::PublicKey;
43use crate::git;
44use crate::identity::RepoId;
45use crate::profile;
46use crate::storage::refs::{FeatureLevel, RefsAt};
47use crate::storage::RefUpdate;
48
49pub use address::KnownAddress;
50pub use command::{Command, CommandResult, ConnectOptions, Success, DEFAULT_TIMEOUT};
51pub use config::Config;
52pub use cyphernet::addr::{HostName, PeerAddr, PeerAddrParseError};
53pub use db::Database;
54pub use events::{Event, Events};
55pub use features::Features;
56pub use radicle_core::NodeId;
57pub use seed::SyncedAt;
58pub use timestamp::Timestamp;
59
60/// Peer-to-peer protocol version.
61pub const PROTOCOL_VERSION: u8 = 1;
62/// Default radicle protocol port.
63pub const DEFAULT_PORT: u16 = 8776;
64/// Default timeout when waiting for an event to be received on the
65/// [`Handle::subscribe`] channel.
66pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
67/// Maximum length in bytes of a node alias.
68pub const MAX_ALIAS_LENGTH: usize = 32;
69/// Penalty threshold at which point we avoid connecting to this node.
70pub const PENALTY_CONNECT_THRESHOLD: u8 = 32;
71/// Penalty threshold at which point we ban this node.
72pub const PENALTY_BAN_THRESHOLD: u8 = 64;
73/// Filename of node database under the node directory.
74pub const NODE_DB_FILE: &str = "node.db";
75/// Filename of policies database under the node directory.
76pub const POLICIES_DB_FILE: &str = "policies.db";
77/// Filename of notifications database under the node directory.
78pub const NOTIFICATIONS_DB_FILE: &str = "notifications.db";
79
80#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
81pub enum PingState {
82    #[default]
83    /// The peer has not been sent a ping.
84    None,
85    /// A ping has been sent and is waiting on the peer's response.
86    AwaitingResponse {
87        /// Length of pong payload expected.
88        len: u16,
89        /// Since when are we waiting.
90        since: LocalTime,
91    },
92    /// The peer was successfully pinged.
93    Ok,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97#[allow(clippy::large_enum_variant)]
98#[serde(rename_all = "camelCase")]
99#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
100pub enum State {
101    /// Initial state for outgoing connections.
102    Initial,
103    /// Connection attempted successfully.
104    Attempted,
105    /// Initial state after handshake protocol hand-off.
106    #[serde(rename_all = "camelCase")]
107    Connected {
108        /// Connected since this time.
109        since: LocalTime,
110        /// Ping state.
111        #[serde(skip)]
112        ping: PingState,
113        /// Measured latencies for this peer.
114        #[serde(skip)]
115        latencies: VecDeque<LocalDuration>,
116        /// Whether the connection is stable.
117        #[serde(skip)]
118        stable: bool,
119    },
120    /// When a peer is disconnected.
121    #[serde(rename_all = "camelCase")]
122    Disconnected {
123        /// Since when has this peer been disconnected.
124        since: LocalTime,
125        /// When to retry the connection.
126        retry_at: LocalTime,
127    },
128}
129
130impl State {
131    /// Check if this is a connected state.
132    pub fn is_connected(&self) -> bool {
133        matches!(self, Self::Connected { .. })
134    }
135}
136
137impl fmt::Display for State {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        match self {
140            Self::Initial => {
141                write!(f, "initial")
142            }
143            Self::Attempted { .. } => {
144                write!(f, "attempted")
145            }
146            Self::Connected { .. } => {
147                write!(f, "connected")
148            }
149            Self::Disconnected { .. } => {
150                write!(f, "disconnected")
151            }
152        }
153    }
154}
155
156/// Severity of a peer misbehavior or a connection problem.
157#[derive(Debug, Copy, Clone, PartialEq, Eq)]
158pub enum Severity {
159    Low = 0,
160    Medium = 1,
161    High = 8,
162}
163
164/// Node connection penalty. Nodes with a high penalty are deprioritized as peers.
165#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
166pub struct Penalty(u8);
167
168impl Penalty {
169    /// If the penalty threshold is reached, at which point we should just avoid
170    /// connecting to this node.
171    pub fn is_connect_threshold_reached(&self) -> bool {
172        self.0 >= PENALTY_CONNECT_THRESHOLD
173    }
174
175    pub fn is_ban_threshold_reached(&self) -> bool {
176        self.0 >= PENALTY_BAN_THRESHOLD
177    }
178}
179
180/// Repository sync status for our own refs.
181#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
182#[serde(tag = "status")]
183#[serde(rename_all = "camelCase")]
184#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
185pub enum SyncStatus {
186    /// We're in sync.
187    #[serde(rename_all = "camelCase")]
188    Synced {
189        /// At what ref was the remote synced at.
190        at: SyncedAt,
191    },
192    /// We're out of sync.
193    #[serde(rename_all = "camelCase")]
194    OutOfSync {
195        /// Local head of our `rad/sigrefs`.
196        local: SyncedAt,
197        /// Remote head of our `rad/sigrefs`.
198        remote: SyncedAt,
199    },
200}
201
202impl Ord for SyncStatus {
203    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
204        match (self, other) {
205            (Self::Synced { at: left }, Self::Synced { at: right }) => left.cmp(right),
206            (Self::Synced { at }, Self::OutOfSync { remote, .. }) => at.cmp(remote),
207            (Self::OutOfSync { remote, .. }, Self::Synced { at }) => remote.cmp(at),
208            (Self::OutOfSync { remote: left, .. }, Self::OutOfSync { remote: right, .. }) => {
209                left.cmp(right)
210            }
211        }
212    }
213}
214
215impl PartialOrd for SyncStatus {
216    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
217        Some(self.cmp(other))
218    }
219}
220
221/// Node user agent.
222#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
223pub struct UserAgent(String);
224
225impl UserAgent {
226    /// Return a reference to the user agent string.
227    pub fn as_str(&self) -> &str {
228        self.0.as_str()
229    }
230}
231
232impl Default for UserAgent {
233    fn default() -> Self {
234        UserAgent(String::from("/radicle/"))
235    }
236}
237
238impl std::fmt::Display for UserAgent {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        self.0.fmt(f)
241    }
242}
243
244impl FromStr for UserAgent {
245    type Err = String;
246
247    fn from_str(input: &str) -> Result<Self, Self::Err> {
248        let reserved = ['/', ':'];
249
250        if input.len() > 64 {
251            return Err(input.to_owned());
252        }
253        let Some(s) = input.strip_prefix('/') else {
254            return Err(input.to_owned());
255        };
256        let Some(s) = s.strip_suffix('/') else {
257            return Err(input.to_owned());
258        };
259        if s.is_empty() {
260            return Err(input.to_owned());
261        }
262        if s.split('/').all(|segment| {
263            if let Some((client, version)) = segment.split_once(':') {
264                if client.is_empty() || version.is_empty() {
265                    false
266                } else {
267                    let client = client
268                        .chars()
269                        .all(|c| c.is_ascii_graphic() && !reserved.contains(&c));
270                    let version = version
271                        .chars()
272                        .all(|c| c.is_ascii_graphic() || !reserved.contains(&c));
273                    client && version
274                }
275            } else {
276                true
277            }
278        }) {
279            Ok(Self(input.to_owned()))
280        } else {
281            Err(input.to_owned())
282        }
283    }
284}
285
286impl AsRef<str> for UserAgent {
287    fn as_ref(&self) -> &str {
288        self.0.as_str()
289    }
290}
291
292/// Node alias, i.e. a short and memorable name for it.
293#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
294#[serde(try_from = "String", into = "String")]
295#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
296pub struct Alias(
297    // To exclude control characters, one might be inclined to use the character
298    // class `[[:cntrl:]]` which is understood by the `regex` crate.
299    // However, the patterns in JSON schema must conform to ECMA-262, which does
300    // not specify the character class.
301    // Thus, we unfold its definition from <https://www.unicode.org/reports/tr18/#cntrl>,
302    // which refers to the "general category" named "Cc",
303    // see <https://unicode.org/reports/tr44/#General_Category_Values>.
304    // We obtain the two ranges below from <https://www.unicode.org/notes/tn36/Categories.txt>.
305    #[cfg_attr(
306        feature = "schemars",
307        schemars(regex(pattern = r"^[^\x00-\x1F\x7F-\x9F\s]{0,32}$"), length(max = 32))
308    )]
309    String,
310);
311
312impl Alias {
313    /// Create a new alias from a string. Panics if the string is not a valid alias.
314    pub fn new(alias: impl ToString) -> Self {
315        let alias = alias.to_string();
316
317        match Self::from_str(&alias) {
318            Ok(a) => a,
319            Err(e) => panic!("Alias::new: {e}"),
320        }
321    }
322
323    /// Return a reference to the alias string.
324    pub fn as_str(&self) -> &str {
325        self.0.as_str()
326    }
327}
328
329impl From<Alias> for String {
330    fn from(value: Alias) -> Self {
331        value.0
332    }
333}
334
335impl From<&NodeId> for Alias {
336    fn from(nid: &NodeId) -> Self {
337        Alias(nid.to_string())
338    }
339}
340
341impl fmt::Display for Alias {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        self.0.fmt(f)
344    }
345}
346
347impl Deref for Alias {
348    type Target = str;
349
350    fn deref(&self) -> &Self::Target {
351        &self.0
352    }
353}
354
355impl AsRef<str> for Alias {
356    fn as_ref(&self) -> &str {
357        self.0.as_str()
358    }
359}
360
361impl From<&Alias> for [u8; 32] {
362    fn from(input: &Alias) -> [u8; 32] {
363        let mut alias = [0u8; 32];
364
365        alias[..input.len()].copy_from_slice(input.as_bytes());
366        alias
367    }
368}
369
370#[derive(thiserror::Error, Debug)]
371pub enum AliasError {
372    #[error("alias cannot be empty")]
373    Empty,
374    #[error("alias cannot be greater than {MAX_ALIAS_LENGTH} bytes")]
375    MaxBytesExceeded,
376    #[error("alias cannot contain whitespace or control characters")]
377    InvalidCharacter,
378}
379
380impl FromStr for Alias {
381    type Err = AliasError;
382
383    fn from_str(s: &str) -> Result<Self, Self::Err> {
384        if s.is_empty() {
385            return Err(AliasError::Empty);
386        }
387        if s.chars().any(|c| c.is_control() || c.is_whitespace()) {
388            return Err(AliasError::InvalidCharacter);
389        }
390        if s.len() > MAX_ALIAS_LENGTH {
391            return Err(AliasError::MaxBytesExceeded);
392        }
393        Ok(Self(s.to_owned()))
394    }
395}
396
397impl TryFrom<String> for Alias {
398    type Error = AliasError;
399
400    fn try_from(value: String) -> Result<Self, Self::Error> {
401        Alias::from_str(&value)
402    }
403}
404
405impl TryFrom<&sqlite::Value> for Alias {
406    type Error = sqlite::Error;
407
408    fn try_from(value: &sqlite::Value) -> Result<Self, Self::Error> {
409        match value {
410            sqlite::Value::String(s) => Self::from_str(s).map_err(|e| sqlite::Error {
411                code: None,
412                message: Some(e.to_string()),
413            }),
414            _ => Err(sqlite::Error {
415                code: None,
416                message: Some(format!(
417                    "sql: invalid type {:?} for alias, expected {:?}",
418                    value.kind(),
419                    sqlite::Type::String
420                )),
421            }),
422        }
423    }
424}
425
426/// Peer public protocol address.
427#[derive(Clone, Eq, PartialEq, Debug, Hash, From, Wrapper, WrapperMut, Serialize, Deserialize)]
428#[wrapper(Deref)]
429#[wrapper_mut(DerefMut)]
430#[serde(try_from = "String", into = "String")]
431#[cfg_attr(
432    feature = "schemars",
433    derive(schemars::JsonSchema),
434    schemars(description = "\
435    An IP address, or a DNS name, or a Tor onion name, followed by the symbol ':', \
436    followed by a TCP port number.",
437    extend(
438        "examples" = [
439            "xmrhfasfg5suueegrnc4gsgyi2tyclcy5oz7f5drnrodmdtob6t2ioyd.onion:8776",
440            "seed.example.com:8776",
441            "192.0.2.0:31337",
442        ],
443        "pattern" = "^.+:((6553[0-5])|(655[0-2][0-9])|(65[0-4][0-9]{2})|(6[0-4][0-9]{3})|([1-5][0-9]{4})|([0-5]{0,5})|([0-9]{1,4}))$",
444    ),
445))]
446pub struct Address(NetAddr<HostName>);
447
448impl Address {
449    /// Check whether this address is from the local network.
450    pub fn is_local(&self) -> bool {
451        match &self.0.host {
452            HostName::Ip(ip) => address::is_local(ip),
453            HostName::Dns(name) => {
454                let name = name.strip_suffix(".").unwrap_or(name);
455
456                // RFC 2606, Section 2
457                // <https://datatracker.ietf.org/doc/html/rfc2606#section-2>
458                name.ends_with(".localhost") || name == "localhost"
459            }
460            _ => false,
461        }
462    }
463
464    /// Check whether this address is globally routable.
465    pub fn is_routable(&self) -> bool {
466        match self.0.host {
467            HostName::Ip(ip) => address::is_routable(&ip),
468            HostName::Dns(_) => !self.is_local(),
469            _ => true,
470        }
471    }
472
473    /// Return the [`HostName`] of the [`Address`].
474    pub fn host(&self) -> &HostName {
475        &self.0.host
476    }
477
478    /// Returns `true` if the [`HostName`] is a Tor onion address.
479    pub fn is_onion(&self) -> bool {
480        match self.0.host {
481            HostName::Tor(_) => true,
482            _ => false,
483        }
484    }
485
486    /// Return the port number of the [`Address`].
487    pub fn port(&self) -> u16 {
488        self.0.port
489    }
490
491    pub fn display_compact(&self) -> impl Display {
492        let host = match self.host() {
493            HostName::Ip(IpAddr::V4(ip)) => ip.to_string(),
494            HostName::Ip(IpAddr::V6(ip)) => format!("[{ip}]"),
495            HostName::Dns(dns) => dns.clone(),
496            HostName::Tor(onion) => {
497                let onion = onion.to_string();
498                let start = onion.chars().take(8).collect::<String>();
499                let end = onion
500                    .chars()
501                    .skip(onion.len() - 8 - ".onion".len())
502                    .collect::<String>();
503                format!("{start}…{end}")
504            }
505            _ => unreachable!(),
506        };
507
508        let port = self.port().to_string();
509
510        format!("{host}:{port}")
511    }
512}
513
514impl Display for Address {
515    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516        match self.host() {
517            HostName::Ip(IpAddr::V6(ip)) => {
518                write!(f, "[{ip}]:{}", self.port())
519            }
520            _ => self.0.fmt(f),
521        }
522    }
523}
524
525impl TryFrom<String> for Address {
526    type Error = <Self as FromStr>::Err;
527
528    fn try_from(value: String) -> Result<Self, Self::Error> {
529        value.parse()
530    }
531}
532
533impl From<Address> for String {
534    fn from(value: Address) -> Self {
535        value.to_string()
536    }
537}
538
539impl FromStr for Address {
540    type Err = AddrParseError;
541
542    fn from_str(s: &str) -> Result<Self, Self::Err> {
543        let (host, port) = s.rsplit_once(':').ok_or(AddrParseError::PortAbsent)?;
544
545        let host = if let Some(host) = host
546            .strip_prefix('[')
547            .and_then(|host| host.strip_suffix(']'))
548        {
549            HostName::Ip(host.parse::<Ipv6Addr>()?.into())
550        } else {
551            // Warn on IPv6 addresses that are not enclosed in `[` and `]`.
552            host.parse().map(|host| match host {
553                HostName::Ip(IpAddr::V6(addr)) => {
554                    log::warn!("Address format will change in the future. '{s}' should be changed to '[{addr}]:{port}' to stay compatible. Refer to RFC 5926, Sec. 6 as well as RFC 3986, Sec. D.1. and RFC 2732, Sec. 2.");
555                    host
556                },
557                host => host,
558            })?
559        };
560
561        let port = port.parse().map_err(|_| AddrParseError::InvalidPort)?;
562
563        Ok(Self(NetAddr::new(host, port)))
564    }
565}
566
567impl cyphernet::addr::Host for Address {
568    fn requires_proxy(&self) -> bool {
569        self.0.requires_proxy()
570    }
571}
572
573impl cyphernet::addr::Addr for Address {
574    fn port(&self) -> u16 {
575        self.0.port()
576    }
577}
578
579impl From<net::SocketAddr> for Address {
580    fn from(addr: net::SocketAddr) -> Self {
581        Address(NetAddr {
582            host: HostName::Ip(addr.ip()),
583            port: addr.port(),
584        })
585    }
586}
587
588impl From<Address> for HostName {
589    fn from(addr: Address) -> Self {
590        addr.0.host
591    }
592}
593
594/// Connection link direction.
595#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
596#[serde(rename_all = "camelCase")]
597#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
598pub enum Link {
599    /// Outgoing connection.
600    Outbound,
601    /// Incoming connection.
602    Inbound,
603}
604
605impl Link {
606    /// Check if this is an outbound link.
607    pub fn is_outbound(&self) -> bool {
608        matches!(self, Self::Outbound)
609    }
610
611    /// Check if this is an inbound link.
612    pub fn is_inbound(&self) -> bool {
613        matches!(self, Self::Inbound)
614    }
615}
616
617impl std::fmt::Display for Link {
618    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619        match self {
620            Link::Outbound => write!(f, "outbound"),
621            Link::Inbound => write!(f, "inbound"),
622        }
623    }
624}
625
626/// An established network connection with a peer.
627#[derive(Debug, Clone, Serialize, Deserialize)]
628#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
629pub struct Session {
630    pub nid: NodeId,
631    pub link: Link,
632    pub addr: Address,
633    pub state: State,
634}
635
636impl Session {
637    /// Calls [`State::is_connected`] on the session state.
638    pub fn is_connected(&self) -> bool {
639        self.state.is_connected()
640    }
641}
642
643/// A seed for some repository, with metadata about its status.
644#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
645#[serde(rename_all = "camelCase")]
646#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
647pub struct Seed {
648    /// The Node ID.
649    pub nid: NodeId,
650    /// Known addresses for this seed.
651    pub addrs: Vec<KnownAddress>,
652    /// The seed's session state, if any.
653    #[serde(default, skip_serializing_if = "Option::is_none")]
654    pub state: Option<State>,
655    /// The seed's sync status, if any.
656    #[serde(default, skip_serializing_if = "Option::is_none")]
657    pub sync: Option<SyncStatus>,
658}
659
660impl Seed {
661    /// Check if this is a "connected" seed.
662    pub fn is_connected(&self) -> bool {
663        matches!(self.state, Some(State::Connected { .. }))
664    }
665
666    /// Check if this seed is in sync with us.
667    pub fn is_synced(&self) -> bool {
668        matches!(self.sync, Some(SyncStatus::Synced { .. }))
669    }
670
671    pub fn new(
672        nid: NodeId,
673        addrs: Vec<KnownAddress>,
674        state: Option<State>,
675        sync: Option<SyncStatus>,
676    ) -> Self {
677        Self {
678            nid,
679            addrs,
680            state,
681            sync,
682        }
683    }
684}
685
686#[derive(Clone, Debug, Serialize, Deserialize)]
687/// Represents a set of seeds with associated metadata. Uses an RNG
688/// underneath, so every iteration returns a different ordering.
689#[serde(into = "Vec<Seed>", from = "Vec<Seed>")]
690#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
691pub struct Seeds(
692    #[cfg_attr(feature = "schemars", schemars(with = "Vec<Seed>"))]
693    address::AddressBook<NodeId, Seed>,
694);
695
696impl Seeds {
697    /// Create a new seeds list from an RNG.
698    pub fn new(rng: fastrand::Rng) -> Self {
699        Self(address::AddressBook::new(rng))
700    }
701
702    /// Insert a seed.
703    pub fn insert(&mut self, seed: Seed) {
704        self.0.insert(seed.nid, seed);
705    }
706
707    /// Check membership.
708    pub fn contains(&self, nid: &NodeId) -> bool {
709        self.0.contains_key(nid)
710    }
711
712    /// Number of seeds.
713    pub fn len(&self) -> usize {
714        self.0.len()
715    }
716
717    /// Check if there are any seeds.
718    pub fn is_empty(&self) -> bool {
719        self.0.is_empty()
720    }
721
722    /// Partitions the list of seeds into connected and disconnected seeds.
723    /// Note that the disconnected seeds may be in a "connecting" state.
724    pub fn partition(&self) -> (Vec<Seed>, Vec<Seed>) {
725        self.iter().cloned().partition(|s| s.is_connected())
726    }
727
728    /// Return connected seeds.
729    pub fn connected(&self) -> impl Iterator<Item = &Seed> {
730        self.iter().filter(|s| s.is_connected())
731    }
732
733    /// Return all seeds.
734    pub fn iter(&self) -> impl Iterator<Item = &Seed> {
735        self.0.shuffled().map(|(_, v)| v)
736    }
737
738    /// Check if a seed is connected.
739    pub fn is_connected(&self, nid: &NodeId) -> bool {
740        self.0.get(nid).is_some_and(|s| s.is_connected())
741    }
742
743    /// Return a new seeds object with the given RNG.
744    pub fn with(self, rng: fastrand::Rng) -> Self {
745        Self(self.0.with(rng))
746    }
747}
748
749impl From<Seeds> for Vec<Seed> {
750    fn from(seeds: Seeds) -> Vec<Seed> {
751        seeds.0.into_shuffled().map(|(_, v)| v).collect()
752    }
753}
754
755impl From<Vec<Seed>> for Seeds {
756    fn from(other: Vec<Seed>) -> Seeds {
757        Seeds(address::AddressBook::from_iter(
758            other.into_iter().map(|s| (s.nid, s)),
759        ))
760    }
761}
762
763#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
764#[serde(tag = "status", rename_all = "camelCase")]
765#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
766pub enum FetchResult {
767    Success {
768        updated: Vec<RefUpdate>,
769        namespaces: HashSet<NodeId>,
770        clone: bool,
771    },
772    // TODO: Create enum for reason.
773    Failed {
774        reason: String,
775    },
776}
777
778impl FetchResult {
779    pub fn is_success(&self) -> bool {
780        matches!(self, FetchResult::Success { .. })
781    }
782
783    pub fn success(self) -> Option<(Vec<RefUpdate>, HashSet<NodeId>)> {
784        match self {
785            Self::Success {
786                updated,
787                namespaces,
788                ..
789            } => Some((updated, namespaces)),
790            _ => None,
791        }
792    }
793
794    pub fn find_updated(&self, name: &git::fmt::RefStr) -> Option<RefUpdate> {
795        let updated = match self {
796            Self::Success { updated, .. } => Some(updated),
797            _ => None,
798        }?;
799        updated.iter().find(|up| up.name() == name).cloned()
800    }
801}
802
803impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>> for FetchResult {
804    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>) -> Self {
805        match value {
806            Ok((updated, namespaces, clone)) => Self::Success {
807                updated,
808                namespaces,
809                clone,
810            },
811            Err(err) => Self::Failed {
812                reason: err.to_string(),
813            },
814        }
815    }
816}
817
818/// Holds multiple fetch results.
819#[derive(Clone, Debug, Default)]
820pub struct FetchResults(Vec<(NodeId, FetchResult)>);
821
822impl FetchResults {
823    /// Push a fetch result.
824    pub fn push(&mut self, nid: NodeId, result: FetchResult) {
825        self.0.push((nid, result));
826    }
827
828    /// Check if the results contains the given NID.
829    pub fn contains(&self, nid: &NodeId) -> bool {
830        self.0.iter().any(|(n, _)| n == nid)
831    }
832
833    /// Get a node's result.
834    pub fn get(&self, nid: &NodeId) -> Option<&FetchResult> {
835        self.0.iter().find(|(n, _)| n == nid).map(|(_, r)| r)
836    }
837
838    /// Iterate over all fetch results.
839    pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &FetchResult)> {
840        self.0.iter().map(|(nid, r)| (nid, r))
841    }
842
843    /// Iterate over successful fetches.
844    pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate], HashSet<NodeId>)> {
845        self.0.iter().filter_map(|(nid, r)| {
846            if let FetchResult::Success {
847                updated,
848                namespaces,
849                ..
850            } = r
851            {
852                Some((nid, updated.as_slice(), namespaces.clone()))
853            } else {
854                None
855            }
856        })
857    }
858
859    /// Iterate over failed fetches.
860    pub fn failed(&self) -> impl Iterator<Item = (&NodeId, &str)> {
861        self.0.iter().filter_map(|(nid, r)| {
862            if let FetchResult::Failed { reason } = r {
863                Some((nid, reason.as_str()))
864            } else {
865                None
866            }
867        })
868    }
869}
870
871impl From<Vec<(NodeId, FetchResult)>> for FetchResults {
872    fn from(value: Vec<(NodeId, FetchResult)>) -> Self {
873        Self(value)
874    }
875}
876
877impl Deref for FetchResults {
878    type Target = [(NodeId, FetchResult)];
879
880    fn deref(&self) -> &Self::Target {
881        self.0.as_slice()
882    }
883}
884
885impl IntoIterator for FetchResults {
886    type Item = (NodeId, FetchResult);
887    type IntoIter = std::vec::IntoIter<(NodeId, FetchResult)>;
888
889    fn into_iter(self) -> Self::IntoIter {
890        self.0.into_iter()
891    }
892}
893
894/// Error returned by [`Handle`] functions.
895#[derive(thiserror::Error, Debug)]
896pub enum Error {
897    #[error("i/o: {0}")]
898    Io(#[from] io::Error),
899    #[error("node: {0}")]
900    Node(String),
901    #[error("timed out reading from control socket")]
902    TimedOut,
903    #[error("failed to open node control socket {0:?} ({1})")]
904    Connect(PathBuf, io::ErrorKind),
905    #[error("command error: {reason}")]
906    Command { reason: String },
907    #[error("received invalid json `{response}` in response to command: {error}")]
908    InvalidJson {
909        response: String,
910        error: json::Error,
911    },
912    #[error("received empty response for command")]
913    EmptyResponse,
914}
915
916impl Error {
917    /// Check if the error is due to the not being able to connect to the local node.
918    pub fn is_connection_err(&self) -> bool {
919        matches!(self, Self::Connect { .. })
920    }
921}
922
923#[derive(Debug, Serialize, Deserialize)]
924#[serde(rename_all = "camelCase", tag = "status")]
925#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
926pub enum ConnectResult {
927    Connected,
928    Disconnected { reason: String },
929}
930
931/// A handle to send commands to the node or request information.
932pub trait Handle: Clone + Sync + Send {
933    /// The peer sessions type.
934    type Sessions;
935    type Events: IntoIterator<Item = Self::Event>;
936    type Event;
937    /// The error returned by all methods.
938    type Error: std::error::Error + Send + Sync + 'static;
939
940    /// Get the local Node ID.
941    fn nid(&self) -> Result<NodeId, Self::Error>;
942    /// Check if the node is running.
943    fn is_running(&self) -> bool;
944    /// Get the node's bound listen addresses.
945    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error>;
946    /// Get the current node configuration.
947    fn config(&self) -> Result<config::Config, Self::Error>;
948    /// Connect to a peer.
949    fn connect(
950        &mut self,
951        node: NodeId,
952        addr: Address,
953        opts: ConnectOptions,
954    ) -> Result<ConnectResult, Self::Error>;
955    /// Disconnect from a peer.
956    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error>;
957
958    /// Look up the seeds of a given repository in the routing table.
959    #[deprecated(note = "use `seeds_for` instead")]
960    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
961        self.seeds_for(id, [self.nid()?])
962    }
963
964    /// Look up the seeds of a given repository in the routing table
965    /// and report sync status for `namespaces`.
966    fn seeds_for(
967        &mut self,
968        id: RepoId,
969        namespaces: impl IntoIterator<Item = PublicKey>,
970    ) -> Result<Seeds, Self::Error>;
971
972    /// Fetch a repository from the network.
973    fn fetch(
974        &mut self,
975        id: RepoId,
976        from: NodeId,
977        timeout: time::Duration,
978        signed_references_minimum_feature_level: Option<FeatureLevel>,
979    ) -> Result<FetchResult, Self::Error>;
980    /// Start seeding the given repo. May update the scope. Does nothing if the
981    /// repo is already seeded.
982    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Self::Error>;
983    /// Start following the given peer.
984    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>;
985    /// Set the following policy to block for the given peer.
986    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error>;
987    /// Un-seed the given repo and delete it from storage.
988    fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
989    /// Unfollow the given peer.
990    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error>;
991
992    /// Notify the service that a repository has been updated, and references
993    /// should be announced over the network.
994    #[deprecated(note = "use `announce_refs_for` instead")]
995    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
996        self.announce_refs_for(id, [self.nid()?])
997    }
998
999    /// Notify the service that a repository has been updated, and references
1000    /// for the given `namespaces` should be announced over the network.
1001    fn announce_refs_for(
1002        &mut self,
1003        id: RepoId,
1004        namespaces: impl IntoIterator<Item = PublicKey>,
1005    ) -> Result<RefsAt, Self::Error>;
1006
1007    /// Announce local inventory.
1008    fn announce_inventory(&mut self) -> Result<(), Self::Error>;
1009    /// Notify the service that our inventory was updated with the given repository.
1010    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Self::Error>;
1011    /// Ask the service to shutdown.
1012    fn shutdown(self) -> Result<(), Self::Error>;
1013    /// Query the peer session state.
1014    fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
1015    /// Query the state of a peer session. Returns [`None`] if no session was found.
1016    fn session(&self, node: NodeId) -> Result<Option<Session>, Self::Error>;
1017    /// Subscribe to node events.
1018    fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
1019    /// Return debug information as a JSON value.
1020    fn debug(&self) -> Result<json::Value, Self::Error>;
1021}
1022
1023/// Iterator of results `T` when passing a [`Command`] to [`Node::call`].
1024///
1025/// The iterator blocks for a `timeout` duration, returning [`Error::TimedOut`]
1026/// if the duration is reached.
1027pub struct LineIter<T> {
1028    stream: BufReader<UnixStream>,
1029    timeout: time::Duration,
1030    witness: PhantomData<T>,
1031}
1032
1033impl<T: DeserializeOwned> Iterator for LineIter<T> {
1034    type Item = Result<T, Error>;
1035
1036    fn next(&mut self) -> Option<Self::Item> {
1037        let mut l = String::new();
1038
1039        self.stream
1040            .get_ref()
1041            .set_read_timeout(Some(self.timeout))
1042            .ok();
1043
1044        match self.stream.read_line(&mut l) {
1045            Ok(0) => None,
1046            Ok(_) => {
1047                let result: CommandResult<T> = match json::from_str(&l) {
1048                    Err(e) => {
1049                        return Some(Err(Error::InvalidJson {
1050                            response: l.clone(),
1051                            error: e,
1052                        }))
1053                    }
1054                    Ok(result) => result,
1055                };
1056                match result {
1057                    CommandResult::Okay(result) => Some(Ok(result)),
1058                    CommandResult::Error { reason } => Some(Err(Error::Command { reason })),
1059                }
1060            }
1061            Err(e) => match e.kind() {
1062                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Some(Err(Error::TimedOut)),
1063                _ => Some(Err(Error::Io(e))),
1064            },
1065        }
1066    }
1067}
1068
1069/// Node controller.
1070#[derive(Debug, Clone)]
1071pub struct Node {
1072    socket: PathBuf,
1073}
1074
1075impl Node {
1076    /// Connect to the node, via the socket at the given path.
1077    pub fn new<P: AsRef<Path>>(path: P) -> Self {
1078        Self {
1079            socket: path.as_ref().to_path_buf(),
1080        }
1081    }
1082
1083    /// Call a command on the node.
1084    pub fn call<T: DeserializeOwned + Send + 'static>(
1085        &self,
1086        cmd: Command,
1087        timeout: time::Duration,
1088    ) -> Result<LineIter<T>, Error> {
1089        let mut stream = UnixStream::connect(&self.socket)
1090            .map_err(|e| Error::Connect(self.socket.clone(), e.kind()))?;
1091        cmd.to_writer(&mut stream)?;
1092        Ok(LineIter {
1093            stream: BufReader::new(stream),
1094            timeout,
1095            witness: PhantomData,
1096        })
1097    }
1098
1099    /// Announce refs of the given `rid` to the given seeds.
1100    /// Waits for the seeds to acknowledge the refs or times out if no acknowledgments are received
1101    /// within the given time.
1102    pub fn announce(
1103        &mut self,
1104        rid: RepoId,
1105        namespaces: impl IntoIterator<Item = PublicKey>,
1106        timeout: time::Duration,
1107        mut announcer: sync::Announcer,
1108        mut report: impl FnMut(&NodeId, sync::announce::Progress),
1109    ) -> Result<sync::AnnouncerResult, Error> {
1110        let mut events = self.subscribe(timeout)?;
1111        let refs = self.announce_refs_for(rid, namespaces)?;
1112
1113        let started = time::Instant::now();
1114
1115        loop {
1116            let Some(e) = events.next() else {
1117                // Consider the announcement as timed out if there are no more
1118                // events
1119                return Ok(announcer.timed_out());
1120            };
1121            let elapsed = started.elapsed();
1122            if elapsed >= timeout {
1123                return Ok(announcer.timed_out());
1124            }
1125            match e {
1126                Ok(Event::RefsSynced {
1127                    remote,
1128                    rid: rid_,
1129                    at,
1130                }) if rid == rid_ && refs.at == at => {
1131                    log::debug!(target: "radicle", "Received {e:?}");
1132                    match announcer.synced_with(remote, elapsed) {
1133                        ControlFlow::Continue(progress) => {
1134                            report(&remote, progress);
1135                        }
1136                        ControlFlow::Break(finished) => {
1137                            return Ok(finished.into());
1138                        }
1139                    }
1140                }
1141                Ok(_) => {}
1142
1143                Err(Error::TimedOut) => {
1144                    return Ok(announcer.timed_out());
1145                }
1146                Err(e) => return Err(e),
1147            }
1148            // Ensure that the announcer is still waiting for nodes to be
1149            // in-sync with
1150            announcer = match announcer.can_continue() {
1151                ControlFlow::Continue(cont) => cont,
1152                ControlFlow::Break(finished) => return Ok(finished.into()),
1153            };
1154        }
1155    }
1156}
1157
1158// TODO(finto): repo_policies, node_policies, and routing should all
1159// attempt to return iterators instead of allocating vecs.
1160impl Handle for Node {
1161    type Sessions = Vec<Session>;
1162    type Events = LineIter<Event>;
1163    type Event = Result<Event, Error>;
1164    type Error = Error;
1165
1166    fn nid(&self) -> Result<NodeId, Error> {
1167        self.call::<NodeId>(Command::NodeId, DEFAULT_TIMEOUT)?
1168            .next()
1169            .ok_or(Error::EmptyResponse)?
1170    }
1171
1172    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Error> {
1173        self.call::<Vec<net::SocketAddr>>(Command::ListenAddrs, DEFAULT_TIMEOUT)?
1174            .next()
1175            .ok_or(Error::EmptyResponse)?
1176    }
1177
1178    fn is_running(&self) -> bool {
1179        let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
1180            return false;
1181        };
1182
1183        let Some(Ok(_)) = lines.next() else {
1184            return false;
1185        };
1186        true
1187    }
1188
1189    fn config(&self) -> Result<config::Config, Error> {
1190        self.call::<config::Config>(Command::Config, DEFAULT_TIMEOUT)?
1191            .next()
1192            .ok_or(Error::EmptyResponse)?
1193    }
1194
1195    fn connect(
1196        &mut self,
1197        nid: NodeId,
1198        addr: Address,
1199        opts: ConnectOptions,
1200    ) -> Result<ConnectResult, Error> {
1201        let timeout = opts.timeout;
1202        let result = self
1203            .call::<ConnectResult>(
1204                Command::Connect {
1205                    addr: (nid, addr).into(),
1206                    opts,
1207                },
1208                timeout,
1209            )?
1210            .next()
1211            .ok_or(Error::EmptyResponse)??;
1212
1213        Ok(result)
1214    }
1215
1216    fn disconnect(&mut self, nid: NodeId) -> Result<(), Self::Error> {
1217        self.call::<ConnectResult>(Command::Disconnect { nid }, DEFAULT_TIMEOUT)?
1218            .next()
1219            .ok_or(Error::EmptyResponse)??;
1220
1221        Ok(())
1222    }
1223
1224    fn seeds_for(
1225        &mut self,
1226        rid: RepoId,
1227        namespaces: impl IntoIterator<Item = PublicKey>,
1228    ) -> Result<Seeds, Error> {
1229        let seeds = self
1230            .call::<Seeds>(
1231                Command::SeedsFor {
1232                    rid,
1233                    namespaces: HashSet::from_iter(namespaces),
1234                },
1235                DEFAULT_TIMEOUT,
1236            )?
1237            .next()
1238            .ok_or(Error::EmptyResponse)??;
1239
1240        Ok(seeds.with(profile::env::rng()))
1241    }
1242
1243    fn fetch(
1244        &mut self,
1245        rid: RepoId,
1246        from: NodeId,
1247        timeout: time::Duration,
1248        signed_references_minimum_feature_level: Option<FeatureLevel>,
1249    ) -> Result<FetchResult, Error> {
1250        let result = self
1251            .call(
1252                Command::Fetch {
1253                    rid,
1254                    nid: from,
1255                    timeout,
1256                    signed_references_minimum_feature_level,
1257                },
1258                DEFAULT_TIMEOUT.max(timeout),
1259            )?
1260            .next()
1261            .ok_or(Error::EmptyResponse)??;
1262
1263        Ok(result)
1264    }
1265
1266    fn follow(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
1267        let mut lines = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
1268        let response = lines.next().ok_or(Error::EmptyResponse)??;
1269
1270        Ok(response.updated)
1271    }
1272
1273    fn block(&mut self, nid: NodeId) -> Result<bool, Error> {
1274        let mut lines = self.call::<Success>(Command::Block { nid }, DEFAULT_TIMEOUT)?;
1275        let response = lines.next().ok_or(Error::EmptyResponse)??;
1276
1277        Ok(response.updated)
1278    }
1279
1280    fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
1281        let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
1282        let response = lines.next().ok_or(Error::EmptyResponse)??;
1283
1284        Ok(response.updated)
1285    }
1286
1287    fn unfollow(&mut self, nid: NodeId) -> Result<bool, Error> {
1288        let mut lines = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
1289        let response = lines.next().ok_or(Error::EmptyResponse)??;
1290
1291        Ok(response.updated)
1292    }
1293
1294    fn unseed(&mut self, rid: RepoId) -> Result<bool, Error> {
1295        let mut lines = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
1296        let response = lines.next().ok_or(Error::EmptyResponse)??;
1297
1298        Ok(response.updated)
1299    }
1300
1301    fn announce_refs_for(
1302        &mut self,
1303        rid: RepoId,
1304        namespaces: impl IntoIterator<Item = PublicKey>,
1305    ) -> Result<RefsAt, Error> {
1306        let refs: RefsAt = self
1307            .call(
1308                Command::AnnounceRefsFor {
1309                    rid,
1310                    namespaces: HashSet::from_iter(namespaces),
1311                },
1312                DEFAULT_TIMEOUT,
1313            )?
1314            .next()
1315            .ok_or(Error::EmptyResponse)??;
1316
1317        Ok(refs)
1318    }
1319
1320    fn announce_inventory(&mut self) -> Result<(), Error> {
1321        for line in self.call::<Success>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? {
1322            line?;
1323        }
1324        Ok(())
1325    }
1326
1327    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1328        let mut lines = self.call::<Success>(Command::AddInventory { rid }, DEFAULT_TIMEOUT)?;
1329        let response = lines.next().ok_or(Error::EmptyResponse)??;
1330
1331        Ok(response.updated)
1332    }
1333
1334    fn subscribe(&self, timeout: time::Duration) -> Result<LineIter<Event>, Error> {
1335        self.call(Command::Subscribe, timeout)
1336    }
1337
1338    fn sessions(&self) -> Result<Self::Sessions, Error> {
1339        let sessions = self
1340            .call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)?
1341            .next()
1342            .ok_or(Error::EmptyResponse)??;
1343
1344        Ok(sessions)
1345    }
1346
1347    fn session(&self, nid: NodeId) -> Result<Option<Session>, Error> {
1348        let session = self
1349            .call::<Option<Session>>(Command::Session { nid }, DEFAULT_TIMEOUT)?
1350            .next()
1351            .ok_or(Error::EmptyResponse)??;
1352
1353        Ok(session)
1354    }
1355
1356    fn debug(&self) -> Result<json::Value, Self::Error> {
1357        let debug = self
1358            .call::<json::Value>(Command::Debug, DEFAULT_TIMEOUT)?
1359            .next()
1360            .ok_or(Error::EmptyResponse {})??;
1361
1362        Ok(debug)
1363    }
1364
1365    fn shutdown(self) -> Result<(), Error> {
1366        for line in self.call::<Success>(Command::Shutdown, DEFAULT_TIMEOUT)? {
1367            line?;
1368        }
1369        // Wait until the shutdown has completed.
1370        while self.is_running() {
1371            thread::sleep(time::Duration::from_secs(1));
1372        }
1373        Ok(())
1374    }
1375}
1376
1377/// A trait for different sources which can potentially return an alias.
1378pub trait AliasStore {
1379    /// Returns alias of a `NodeId`.
1380    fn alias(&self, nid: &NodeId) -> Option<Alias>;
1381
1382    /// Return all the [`NodeId`]s that match the `alias`.
1383    ///
1384    /// Note that the implementation may choose to allow the alias to be a
1385    /// substring for more dynamic queries, thus a `BTreeMap` is returned to return
1386    /// the full [`Alias`] and matching [`NodeId`]s.
1387    fn reverse_lookup(&self, alias: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>>;
1388}
1389
1390impl AliasStore for HashMap<NodeId, Alias> {
1391    fn alias(&self, nid: &NodeId) -> Option<Alias> {
1392        self.get(nid).map(ToOwned::to_owned)
1393    }
1394
1395    fn reverse_lookup(&self, needle: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>> {
1396        self.iter()
1397            .fold(BTreeMap::new(), |mut result, (node, alias)| {
1398                if alias.contains(needle.as_str()) {
1399                    let nodes = result.entry(alias.clone()).or_default();
1400                    nodes.insert(*node);
1401                }
1402                result
1403            })
1404    }
1405}
1406
1407#[cfg(test)]
1408pub(crate) mod properties {
1409    use std::collections::BTreeSet;
1410
1411    use crate::node::{Alias, NodeId};
1412    use crate::test::arbitrary;
1413
1414    use super::AliasStore;
1415
1416    pub struct AliasInput {
1417        short: (Alias, BTreeSet<NodeId>),
1418        long: (Alias, BTreeSet<NodeId>),
1419    }
1420
1421    impl AliasInput {
1422        pub fn new() -> Self {
1423            let short = arbitrary::gen::<Alias>(0);
1424            let long = {
1425                // Ensure we have a second, unique alias
1426                let mut a = short.to_string();
1427                a.push_str(arbitrary::gen::<Alias>(1).as_str());
1428                Alias::new(a)
1429            };
1430            Self {
1431                short: (short, arbitrary::vec::<NodeId>(3).into_iter().collect()),
1432                long: (long, arbitrary::vec::<NodeId>(2).into_iter().collect()),
1433            }
1434        }
1435
1436        pub fn short(&self) -> &(Alias, BTreeSet<NodeId>) {
1437            &self.short
1438        }
1439
1440        pub fn long(&self) -> &(Alias, BTreeSet<NodeId>) {
1441            &self.long
1442        }
1443    }
1444
1445    /// Given the `AliasInput` ensure that the lookup of `NodeId`s for two
1446    /// aliases works as intended.
1447    ///
1448    /// The `short` alias is a prefix of the `long` alias, so when looking up
1449    /// the `short` alias, both sets of results will return. For the `long`
1450    /// alias, only its results will return.
1451    ///
1452    /// It is also expected that the lookup is case insensitive.
1453    pub fn test_reverse_lookup(store: &impl AliasStore, AliasInput { short, long }: AliasInput) {
1454        let (short, short_ids) = short;
1455        let (long, long_ids) = long;
1456        let first = store.reverse_lookup(&short);
1457        // We get back the results for `short`
1458        assert_eq!(first.get(&short), Some(&short_ids),);
1459        // We also get back the results for `long` since `short` is a prefix of it
1460        assert_eq!(first.get(&long), Some(&long_ids));
1461
1462        let second = store.reverse_lookup(&long);
1463        // We do not get back a result for `short` since it is only a suffix of `long`
1464        assert_eq!(second.get(&short), None);
1465        assert_eq!(second.get(&long), Some(&long_ids));
1466
1467        let mixed_case = Alias::new(
1468            short
1469                .as_str()
1470                .chars()
1471                .enumerate()
1472                .map(|(i, c)| {
1473                    if i % 2 == 0 {
1474                        c.to_ascii_uppercase()
1475                    } else {
1476                        c.to_ascii_lowercase()
1477                    }
1478                })
1479                .collect::<String>(),
1480        );
1481        let upper = store.reverse_lookup(&mixed_case);
1482        assert!(upper.contains_key(&short));
1483    }
1484}
1485
1486#[cfg(test)]
1487#[allow(clippy::unwrap_used)]
1488mod test {
1489    use super::*;
1490    use crate::assert_matches;
1491
1492    #[test]
1493    fn test_user_agent() {
1494        assert!(UserAgent::from_str("/radicle:1.0.0/").is_ok());
1495        assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/").is_ok());
1496        assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/rust:1.77/").is_ok());
1497        assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
1498        assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
1499        assert!(UserAgent::from_str("/radicle:@a.b.c/").is_ok());
1500        assert!(UserAgent::from_str("/radicle/").is_ok());
1501        assert!(UserAgent::from_str("/rad/icle/").is_ok());
1502        assert!(UserAgent::from_str("/rad:ic/le/").is_ok());
1503
1504        assert!(UserAgent::from_str("/:/").is_err());
1505        assert!(UserAgent::from_str("//").is_err());
1506        assert!(UserAgent::from_str("").is_err());
1507        assert!(UserAgent::from_str("radicle:1.0.0/").is_err());
1508        assert!(UserAgent::from_str("/radicle:1.0.0").is_err());
1509        assert!(UserAgent::from_str("/radi cle:1.0/").is_err());
1510        assert!(UserAgent::from_str("/radi\ncle:1.0/").is_err());
1511    }
1512
1513    #[test]
1514    fn test_alias() {
1515        assert!(Alias::from_str("cloudhead").is_ok());
1516        assert!(Alias::from_str("cloud-head").is_ok());
1517        assert!(Alias::from_str("cl0ud.h3ad$__").is_ok());
1518        assert!(Alias::from_str("©loudhèâd").is_ok());
1519
1520        assert!(Alias::from_str("").is_err());
1521        assert!(Alias::from_str(" ").is_err());
1522        assert!(Alias::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").is_err());
1523        assert!(Alias::from_str("cloud\0head").is_err());
1524        assert!(Alias::from_str("cloud head").is_err());
1525        assert!(Alias::from_str("cloudhead\n").is_err());
1526    }
1527
1528    #[test]
1529    fn test_address() {
1530        assert!(Address::from_str("127.0.0.1:8776").is_ok());
1531        assert!(Address::from_str("[::1]:8776").is_ok());
1532        assert!(Address::from_str("[::ffff:127.0.0.1]:8776").is_ok());
1533        assert!(Address::from_str("localhost:8776").is_ok());
1534        assert!(Address::from_str("::1:8776").is_ok()); // Backwards-compatibility
1535
1536        assert!(Address::from_str("").is_err());
1537        assert!(Address::from_str(":").is_err());
1538        assert!(Address::from_str("127.0.0.1").is_err());
1539        assert!(Address::from_str("127.0.0.1:xyz").is_err());
1540        assert!(Address::from_str("[invalid]:8776").is_err());
1541        assert!(Address::from_str("[127.0.0.1]:8776").is_err());
1542    }
1543
1544    #[test]
1545    fn test_command_result() {
1546        #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
1547        struct Test {
1548            value: u32,
1549        }
1550
1551        assert_eq!(json::to_string(&CommandResult::Okay(true)).unwrap(), "true");
1552        assert_eq!(
1553            json::to_string(&CommandResult::Okay(Test { value: 42 })).unwrap(),
1554            "{\"value\":42}"
1555        );
1556        assert_eq!(
1557            json::from_str::<CommandResult<Test>>("{\"value\":42}").unwrap(),
1558            CommandResult::Okay(Test { value: 42 })
1559        );
1560        assert_eq!(json::to_string(&CommandResult::ok()).unwrap(), "{}");
1561        assert_eq!(
1562            json::to_string(&CommandResult::updated(true)).unwrap(),
1563            "{\"updated\":true}"
1564        );
1565        assert_eq!(
1566            json::to_string(&CommandResult::error(io::Error::from(
1567                io::ErrorKind::NotFound
1568            )))
1569            .unwrap(),
1570            "{\"error\":\"entity not found\"}"
1571        );
1572
1573        json::from_str::<CommandResult<State>>(
1574            &serde_json::to_string(&CommandResult::Okay(State::Connected {
1575                since: LocalTime::now(),
1576                ping: Default::default(),
1577                latencies: VecDeque::default(),
1578                stable: false,
1579            }))
1580            .unwrap(),
1581        )
1582        .unwrap();
1583
1584        assert_matches!(
1585            json::from_str::<CommandResult<State>>(
1586                r#"{"connected":{"since":1699636852107,"fetching":[]}}"#
1587            ),
1588            Ok(CommandResult::Okay(_))
1589        );
1590        assert_matches!(
1591            json::from_str::<CommandResult<Seeds>>(
1592                r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
1593            ),
1594            Ok(CommandResult::Okay(_))
1595        );
1596    }
1597}