#![allow(clippy::type_complexity)]
#![allow(clippy::collapsible_if)]
mod features;
pub mod address;
pub mod command;
pub mod config;
pub mod db;
pub mod device;
pub mod events;
pub mod notifications;
pub mod policy;
pub mod refs;
pub mod routing;
pub mod seed;
pub mod sync;
pub mod timestamp;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::fmt::Display;
use std::io::{BufRead, BufReader};
use std::marker::PhantomData;
use std::net::IpAddr;
use std::net::Ipv6Addr;
use std::ops::{ControlFlow, Deref};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::{fmt, io, net, thread, time};
#[cfg(unix)]
use std::os::unix::net::UnixStream;
#[cfg(windows)]
use uds_windows::UnixStream;
use amplify::WrapperMut;
use cyphernet::addr::{AddrParseError, NetAddr};
use localtime::{LocalDuration, LocalTime};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json as json;
use crate::crypto::PublicKey;
use crate::git;
use crate::identity::RepoId;
use crate::profile;
use crate::storage::refs::{FeatureLevel, RefsAt};
use crate::storage::RefUpdate;
pub use address::KnownAddress;
pub use command::{Command, CommandResult, ConnectOptions, Success, DEFAULT_TIMEOUT};
pub use config::Config;
pub use cyphernet::addr::{HostName, PeerAddr, PeerAddrParseError};
pub use db::Database;
pub use events::{Event, Events};
pub use features::Features;
pub use radicle_core::NodeId;
pub use seed::SyncedAt;
pub use timestamp::Timestamp;
pub const PROTOCOL_VERSION: u8 = 1;
pub const DEFAULT_PORT: u16 = 8776;
pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
pub const MAX_ALIAS_LENGTH: usize = 32;
pub const PENALTY_CONNECT_THRESHOLD: u8 = 32;
pub const PENALTY_BAN_THRESHOLD: u8 = 64;
pub const NODE_DB_FILE: &str = "node.db";
pub const POLICIES_DB_FILE: &str = "policies.db";
pub const NOTIFICATIONS_DB_FILE: &str = "notifications.db";
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
pub enum PingState {
#[default]
None,
AwaitingResponse {
len: u16,
since: LocalTime,
},
Ok,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum State {
Initial,
Attempted,
#[serde(rename_all = "camelCase")]
Connected {
since: LocalTime,
#[serde(skip)]
ping: PingState,
#[serde(skip)]
latencies: VecDeque<LocalDuration>,
#[serde(skip)]
stable: bool,
},
#[serde(rename_all = "camelCase")]
Disconnected {
since: LocalTime,
retry_at: LocalTime,
},
}
impl State {
pub fn is_connected(&self) -> bool {
matches!(self, Self::Connected { .. })
}
}
impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Initial => {
write!(f, "initial")
}
Self::Attempted { .. } => {
write!(f, "attempted")
}
Self::Connected { .. } => {
write!(f, "connected")
}
Self::Disconnected { .. } => {
write!(f, "disconnected")
}
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Severity {
Low = 0,
Medium = 1,
High = 8,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
pub struct Penalty(u8);
impl Penalty {
pub fn is_connect_threshold_reached(&self) -> bool {
self.0 >= PENALTY_CONNECT_THRESHOLD
}
pub fn is_ban_threshold_reached(&self) -> bool {
self.0 >= PENALTY_BAN_THRESHOLD
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[serde(tag = "status")]
#[serde(rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum SyncStatus {
#[serde(rename_all = "camelCase")]
Synced {
at: SyncedAt,
},
#[serde(rename_all = "camelCase")]
OutOfSync {
local: SyncedAt,
remote: SyncedAt,
},
}
impl Ord for SyncStatus {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self, other) {
(Self::Synced { at: left }, Self::Synced { at: right }) => left.cmp(right),
(Self::Synced { at }, Self::OutOfSync { remote, .. }) => at.cmp(remote),
(Self::OutOfSync { remote, .. }, Self::Synced { at }) => remote.cmp(at),
(Self::OutOfSync { remote: left, .. }, Self::OutOfSync { remote: right, .. }) => {
left.cmp(right)
}
}
}
}
impl PartialOrd for SyncStatus {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
pub struct UserAgent(String);
impl UserAgent {
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl Default for UserAgent {
fn default() -> Self {
UserAgent(String::from("/radicle/"))
}
}
impl std::fmt::Display for UserAgent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl FromStr for UserAgent {
type Err = String;
fn from_str(input: &str) -> Result<Self, Self::Err> {
let reserved = ['/', ':'];
if input.len() > 64 {
return Err(input.to_owned());
}
let Some(s) = input.strip_prefix('/') else {
return Err(input.to_owned());
};
let Some(s) = s.strip_suffix('/') else {
return Err(input.to_owned());
};
if s.is_empty() {
return Err(input.to_owned());
}
if s.split('/').all(|segment| {
if let Some((client, version)) = segment.split_once(':') {
if client.is_empty() || version.is_empty() {
false
} else {
let client = client
.chars()
.all(|c| c.is_ascii_graphic() && !reserved.contains(&c));
let version = version
.chars()
.all(|c| c.is_ascii_graphic() || !reserved.contains(&c));
client && version
}
} else {
true
}
}) {
Ok(Self(input.to_owned()))
} else {
Err(input.to_owned())
}
}
}
impl AsRef<str> for UserAgent {
fn as_ref(&self) -> &str {
self.0.as_str()
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
#[serde(try_from = "String", into = "String")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct Alias(
#[cfg_attr(
feature = "schemars",
schemars(regex(pattern = r"^[^\x00-\x1F\x7F-\x9F\s]{0,32}$"), length(max = 32))
)]
String,
);
impl Alias {
pub fn new(alias: impl ToString) -> Self {
let alias = alias.to_string();
match Self::from_str(&alias) {
Ok(a) => a,
Err(e) => panic!("Alias::new: {e}"),
}
}
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl From<Alias> for String {
fn from(value: Alias) -> Self {
value.0
}
}
impl From<&NodeId> for Alias {
fn from(nid: &NodeId) -> Self {
Alias(nid.to_string())
}
}
impl fmt::Display for Alias {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl Deref for Alias {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<str> for Alias {
fn as_ref(&self) -> &str {
self.0.as_str()
}
}
impl From<&Alias> for [u8; 32] {
fn from(input: &Alias) -> [u8; 32] {
let mut alias = [0u8; 32];
alias[..input.len()].copy_from_slice(input.as_bytes());
alias
}
}
#[derive(thiserror::Error, Debug)]
pub enum AliasError {
#[error("alias cannot be empty")]
Empty,
#[error("alias cannot be greater than {MAX_ALIAS_LENGTH} bytes")]
MaxBytesExceeded,
#[error("alias cannot contain whitespace or control characters")]
InvalidCharacter,
}
impl FromStr for Alias {
type Err = AliasError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() {
return Err(AliasError::Empty);
}
if s.chars().any(|c| c.is_control() || c.is_whitespace()) {
return Err(AliasError::InvalidCharacter);
}
if s.len() > MAX_ALIAS_LENGTH {
return Err(AliasError::MaxBytesExceeded);
}
Ok(Self(s.to_owned()))
}
}
impl TryFrom<String> for Alias {
type Error = AliasError;
fn try_from(value: String) -> Result<Self, Self::Error> {
Alias::from_str(&value)
}
}
impl TryFrom<&sqlite::Value> for Alias {
type Error = sqlite::Error;
fn try_from(value: &sqlite::Value) -> Result<Self, Self::Error> {
match value {
sqlite::Value::String(s) => Self::from_str(s).map_err(|e| sqlite::Error {
code: None,
message: Some(e.to_string()),
}),
_ => Err(sqlite::Error {
code: None,
message: Some(format!(
"sql: invalid type {:?} for alias, expected {:?}",
value.kind(),
sqlite::Type::String
)),
}),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug, Hash, From, Wrapper, WrapperMut, Serialize, Deserialize)]
#[wrapper(Deref)]
#[wrapper_mut(DerefMut)]
#[serde(try_from = "String", into = "String")]
#[cfg_attr(
feature = "schemars",
derive(schemars::JsonSchema),
schemars(description = "\
An IP address, or a DNS name, or a Tor onion name, followed by the symbol ':', \
followed by a TCP port number.",
extend(
"examples" = [
"xmrhfasfg5suueegrnc4gsgyi2tyclcy5oz7f5drnrodmdtob6t2ioyd.onion:8776",
"seed.example.com:8776",
"192.0.2.0:31337",
],
"pattern" = "^.+:((6553[0-5])|(655[0-2][0-9])|(65[0-4][0-9]{2})|(6[0-4][0-9]{3})|([1-5][0-9]{4})|([0-5]{0,5})|([0-9]{1,4}))$",
),
))]
pub struct Address(NetAddr<HostName>);
impl Address {
pub fn is_local(&self) -> bool {
match &self.0.host {
HostName::Ip(ip) => address::is_local(ip),
HostName::Dns(name) => {
let name = name.strip_suffix(".").unwrap_or(name);
name.ends_with(".localhost") || name == "localhost"
}
_ => false,
}
}
pub fn is_routable(&self) -> bool {
match self.0.host {
HostName::Ip(ip) => address::is_routable(&ip),
HostName::Dns(_) => !self.is_local(),
_ => true,
}
}
pub fn host(&self) -> &HostName {
&self.0.host
}
pub fn is_onion(&self) -> bool {
match self.0.host {
HostName::Tor(_) => true,
_ => false,
}
}
pub fn port(&self) -> u16 {
self.0.port
}
pub fn display_compact(&self) -> impl Display {
let host = match self.host() {
HostName::Ip(IpAddr::V4(ip)) => ip.to_string(),
HostName::Ip(IpAddr::V6(ip)) => format!("[{ip}]"),
HostName::Dns(dns) => dns.clone(),
HostName::Tor(onion) => {
let onion = onion.to_string();
let start = onion.chars().take(8).collect::<String>();
let end = onion
.chars()
.skip(onion.len() - 8 - ".onion".len())
.collect::<String>();
format!("{start}…{end}")
}
_ => unreachable!(),
};
let port = self.port().to_string();
format!("{host}:{port}")
}
}
impl Display for Address {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.host() {
HostName::Ip(IpAddr::V6(ip)) => {
write!(f, "[{ip}]:{}", self.port())
}
_ => self.0.fmt(f),
}
}
}
impl TryFrom<String> for Address {
type Error = <Self as FromStr>::Err;
fn try_from(value: String) -> Result<Self, Self::Error> {
value.parse()
}
}
impl From<Address> for String {
fn from(value: Address) -> Self {
value.to_string()
}
}
impl FromStr for Address {
type Err = AddrParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (host, port) = s.rsplit_once(':').ok_or(AddrParseError::PortAbsent)?;
let host = if let Some(host) = host
.strip_prefix('[')
.and_then(|host| host.strip_suffix(']'))
{
HostName::Ip(host.parse::<Ipv6Addr>()?.into())
} else {
host.parse().map(|host| match host {
HostName::Ip(IpAddr::V6(addr)) => {
log::warn!("Address format will change in the future. '{s}' should be changed to '[{addr}]:{port}' to stay compatible. Refer to RFC 5926, Sec. 6 as well as RFC 3986, Sec. D.1. and RFC 2732, Sec. 2.");
host
},
host => host,
})?
};
let port = port.parse().map_err(|_| AddrParseError::InvalidPort)?;
Ok(Self(NetAddr::new(host, port)))
}
}
impl cyphernet::addr::Host for Address {
fn requires_proxy(&self) -> bool {
self.0.requires_proxy()
}
}
impl cyphernet::addr::Addr for Address {
fn port(&self) -> u16 {
self.0.port()
}
}
impl From<net::SocketAddr> for Address {
fn from(addr: net::SocketAddr) -> Self {
Address(NetAddr {
host: HostName::Ip(addr.ip()),
port: addr.port(),
})
}
}
impl From<Address> for HostName {
fn from(addr: Address) -> Self {
addr.0.host
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum Link {
Outbound,
Inbound,
}
impl Link {
pub fn is_outbound(&self) -> bool {
matches!(self, Self::Outbound)
}
pub fn is_inbound(&self) -> bool {
matches!(self, Self::Inbound)
}
}
impl std::fmt::Display for Link {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Link::Outbound => write!(f, "outbound"),
Link::Inbound => write!(f, "inbound"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct Session {
pub nid: NodeId,
pub link: Link,
pub addr: Address,
pub state: State,
}
impl Session {
pub fn is_connected(&self) -> bool {
self.state.is_connected()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct Seed {
pub nid: NodeId,
pub addrs: Vec<KnownAddress>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state: Option<State>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sync: Option<SyncStatus>,
}
impl Seed {
pub fn is_connected(&self) -> bool {
matches!(self.state, Some(State::Connected { .. }))
}
pub fn is_synced(&self) -> bool {
matches!(self.sync, Some(SyncStatus::Synced { .. }))
}
pub fn new(
nid: NodeId,
addrs: Vec<KnownAddress>,
state: Option<State>,
sync: Option<SyncStatus>,
) -> Self {
Self {
nid,
addrs,
state,
sync,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(into = "Vec<Seed>", from = "Vec<Seed>")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct Seeds(
#[cfg_attr(feature = "schemars", schemars(with = "Vec<Seed>"))]
address::AddressBook<NodeId, Seed>,
);
impl Seeds {
pub fn new(rng: fastrand::Rng) -> Self {
Self(address::AddressBook::new(rng))
}
pub fn insert(&mut self, seed: Seed) {
self.0.insert(seed.nid, seed);
}
pub fn contains(&self, nid: &NodeId) -> bool {
self.0.contains_key(nid)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn partition(&self) -> (Vec<Seed>, Vec<Seed>) {
self.iter().cloned().partition(|s| s.is_connected())
}
pub fn connected(&self) -> impl Iterator<Item = &Seed> {
self.iter().filter(|s| s.is_connected())
}
pub fn iter(&self) -> impl Iterator<Item = &Seed> {
self.0.shuffled().map(|(_, v)| v)
}
pub fn is_connected(&self, nid: &NodeId) -> bool {
self.0.get(nid).is_some_and(|s| s.is_connected())
}
pub fn with(self, rng: fastrand::Rng) -> Self {
Self(self.0.with(rng))
}
}
impl From<Seeds> for Vec<Seed> {
fn from(seeds: Seeds) -> Vec<Seed> {
seeds.0.into_shuffled().map(|(_, v)| v).collect()
}
}
impl From<Vec<Seed>> for Seeds {
fn from(other: Vec<Seed>) -> Seeds {
Seeds(address::AddressBook::from_iter(
other.into_iter().map(|s| (s.nid, s)),
))
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum FetchResult {
Success {
updated: Vec<RefUpdate>,
namespaces: HashSet<NodeId>,
clone: bool,
},
Failed {
reason: String,
},
}
impl FetchResult {
pub fn is_success(&self) -> bool {
matches!(self, FetchResult::Success { .. })
}
pub fn success(self) -> Option<(Vec<RefUpdate>, HashSet<NodeId>)> {
match self {
Self::Success {
updated,
namespaces,
..
} => Some((updated, namespaces)),
_ => None,
}
}
pub fn find_updated(&self, name: &git::fmt::RefStr) -> Option<RefUpdate> {
let updated = match self {
Self::Success { updated, .. } => Some(updated),
_ => None,
}?;
updated.iter().find(|up| up.name() == name).cloned()
}
}
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>> for FetchResult {
fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>) -> Self {
match value {
Ok((updated, namespaces, clone)) => Self::Success {
updated,
namespaces,
clone,
},
Err(err) => Self::Failed {
reason: err.to_string(),
},
}
}
}
#[derive(Clone, Debug, Default)]
pub struct FetchResults(Vec<(NodeId, FetchResult)>);
impl FetchResults {
pub fn push(&mut self, nid: NodeId, result: FetchResult) {
self.0.push((nid, result));
}
pub fn contains(&self, nid: &NodeId) -> bool {
self.0.iter().any(|(n, _)| n == nid)
}
pub fn get(&self, nid: &NodeId) -> Option<&FetchResult> {
self.0.iter().find(|(n, _)| n == nid).map(|(_, r)| r)
}
pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &FetchResult)> {
self.0.iter().map(|(nid, r)| (nid, r))
}
pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate], HashSet<NodeId>)> {
self.0.iter().filter_map(|(nid, r)| {
if let FetchResult::Success {
updated,
namespaces,
..
} = r
{
Some((nid, updated.as_slice(), namespaces.clone()))
} else {
None
}
})
}
pub fn failed(&self) -> impl Iterator<Item = (&NodeId, &str)> {
self.0.iter().filter_map(|(nid, r)| {
if let FetchResult::Failed { reason } = r {
Some((nid, reason.as_str()))
} else {
None
}
})
}
}
impl From<Vec<(NodeId, FetchResult)>> for FetchResults {
fn from(value: Vec<(NodeId, FetchResult)>) -> Self {
Self(value)
}
}
impl Deref for FetchResults {
type Target = [(NodeId, FetchResult)];
fn deref(&self) -> &Self::Target {
self.0.as_slice()
}
}
impl IntoIterator for FetchResults {
type Item = (NodeId, FetchResult);
type IntoIter = std::vec::IntoIter<(NodeId, FetchResult)>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("i/o: {0}")]
Io(#[from] io::Error),
#[error("node: {0}")]
Node(String),
#[error("timed out reading from control socket")]
TimedOut,
#[error("failed to open node control socket {0:?} ({1})")]
Connect(PathBuf, io::ErrorKind),
#[error("command error: {reason}")]
Command { reason: String },
#[error("received invalid json `{response}` in response to command: {error}")]
InvalidJson {
response: String,
error: json::Error,
},
#[error("received empty response for command")]
EmptyResponse,
}
impl Error {
pub fn is_connection_err(&self) -> bool {
matches!(self, Self::Connect { .. })
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "status")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum ConnectResult {
Connected,
Disconnected { reason: String },
}
pub trait Handle: Clone + Sync + Send {
type Sessions;
type Events: IntoIterator<Item = Self::Event>;
type Event;
type Error: std::error::Error + Send + Sync + 'static;
fn nid(&self) -> Result<NodeId, Self::Error>;
fn is_running(&self) -> bool;
fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error>;
fn config(&self) -> Result<config::Config, Self::Error>;
fn connect(
&mut self,
node: NodeId,
addr: Address,
opts: ConnectOptions,
) -> Result<ConnectResult, Self::Error>;
fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error>;
#[deprecated(note = "use `seeds_for` instead")]
fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
self.seeds_for(id, [self.nid()?])
}
fn seeds_for(
&mut self,
id: RepoId,
namespaces: impl IntoIterator<Item = PublicKey>,
) -> Result<Seeds, Self::Error>;
fn fetch(
&mut self,
id: RepoId,
from: NodeId,
timeout: time::Duration,
signed_references_minimum_feature_level: Option<FeatureLevel>,
) -> Result<FetchResult, Self::Error>;
fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Self::Error>;
fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>;
fn block(&mut self, id: NodeId) -> Result<bool, Self::Error>;
fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error>;
#[deprecated(note = "use `announce_refs_for` instead")]
fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
self.announce_refs_for(id, [self.nid()?])
}
fn announce_refs_for(
&mut self,
id: RepoId,
namespaces: impl IntoIterator<Item = PublicKey>,
) -> Result<RefsAt, Self::Error>;
fn announce_inventory(&mut self) -> Result<(), Self::Error>;
fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Self::Error>;
fn shutdown(self) -> Result<(), Self::Error>;
fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
fn session(&self, node: NodeId) -> Result<Option<Session>, Self::Error>;
fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
fn debug(&self) -> Result<json::Value, Self::Error>;
}
pub struct LineIter<T> {
stream: BufReader<UnixStream>,
timeout: time::Duration,
witness: PhantomData<T>,
}
impl<T: DeserializeOwned> Iterator for LineIter<T> {
type Item = Result<T, Error>;
fn next(&mut self) -> Option<Self::Item> {
let mut l = String::new();
self.stream
.get_ref()
.set_read_timeout(Some(self.timeout))
.ok();
match self.stream.read_line(&mut l) {
Ok(0) => None,
Ok(_) => {
let result: CommandResult<T> = match json::from_str(&l) {
Err(e) => {
return Some(Err(Error::InvalidJson {
response: l.clone(),
error: e,
}))
}
Ok(result) => result,
};
match result {
CommandResult::Okay(result) => Some(Ok(result)),
CommandResult::Error { reason } => Some(Err(Error::Command { reason })),
}
}
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Some(Err(Error::TimedOut)),
_ => Some(Err(Error::Io(e))),
},
}
}
}
#[derive(Debug, Clone)]
pub struct Node {
socket: PathBuf,
}
impl Node {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
socket: path.as_ref().to_path_buf(),
}
}
pub fn call<T: DeserializeOwned + Send + 'static>(
&self,
cmd: Command,
timeout: time::Duration,
) -> Result<LineIter<T>, Error> {
let mut stream = UnixStream::connect(&self.socket)
.map_err(|e| Error::Connect(self.socket.clone(), e.kind()))?;
cmd.to_writer(&mut stream)?;
Ok(LineIter {
stream: BufReader::new(stream),
timeout,
witness: PhantomData,
})
}
pub fn announce(
&mut self,
rid: RepoId,
namespaces: impl IntoIterator<Item = PublicKey>,
timeout: time::Duration,
mut announcer: sync::Announcer,
mut report: impl FnMut(&NodeId, sync::announce::Progress),
) -> Result<sync::AnnouncerResult, Error> {
let mut events = self.subscribe(timeout)?;
let refs = self.announce_refs_for(rid, namespaces)?;
let started = time::Instant::now();
loop {
let Some(e) = events.next() else {
return Ok(announcer.timed_out());
};
let elapsed = started.elapsed();
if elapsed >= timeout {
return Ok(announcer.timed_out());
}
match e {
Ok(Event::RefsSynced {
remote,
rid: rid_,
at,
}) if rid == rid_ && refs.at == at => {
log::debug!(target: "radicle", "Received {e:?}");
match announcer.synced_with(remote, elapsed) {
ControlFlow::Continue(progress) => {
report(&remote, progress);
}
ControlFlow::Break(finished) => {
return Ok(finished.into());
}
}
}
Ok(_) => {}
Err(Error::TimedOut) => {
return Ok(announcer.timed_out());
}
Err(e) => return Err(e),
}
announcer = match announcer.can_continue() {
ControlFlow::Continue(cont) => cont,
ControlFlow::Break(finished) => return Ok(finished.into()),
};
}
}
}
impl Handle for Node {
type Sessions = Vec<Session>;
type Events = LineIter<Event>;
type Event = Result<Event, Error>;
type Error = Error;
fn nid(&self) -> Result<NodeId, Error> {
self.call::<NodeId>(Command::NodeId, DEFAULT_TIMEOUT)?
.next()
.ok_or(Error::EmptyResponse)?
}
fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Error> {
self.call::<Vec<net::SocketAddr>>(Command::ListenAddrs, DEFAULT_TIMEOUT)?
.next()
.ok_or(Error::EmptyResponse)?
}
fn is_running(&self) -> bool {
let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
return false;
};
let Some(Ok(_)) = lines.next() else {
return false;
};
true
}
fn config(&self) -> Result<config::Config, Error> {
self.call::<config::Config>(Command::Config, DEFAULT_TIMEOUT)?
.next()
.ok_or(Error::EmptyResponse)?
}
fn connect(
&mut self,
nid: NodeId,
addr: Address,
opts: ConnectOptions,
) -> Result<ConnectResult, Error> {
let timeout = opts.timeout;
let result = self
.call::<ConnectResult>(
Command::Connect {
addr: (nid, addr).into(),
opts,
},
timeout,
)?
.next()
.ok_or(Error::EmptyResponse)??;
Ok(result)
}
fn disconnect(&mut self, nid: NodeId) -> Result<(), Self::Error> {
self.call::<ConnectResult>(Command::Disconnect { nid }, DEFAULT_TIMEOUT)?
.next()
.ok_or(Error::EmptyResponse)??;
Ok(())
}
fn seeds_for(
&mut self,
rid: RepoId,
namespaces: impl IntoIterator<Item = PublicKey>,
) -> Result<Seeds, Error> {
let seeds = self
.call::<Seeds>(
Command::SeedsFor {
rid,
namespaces: HashSet::from_iter(namespaces),
},
DEFAULT_TIMEOUT,
)?
.next()
.ok_or(Error::EmptyResponse)??;
Ok(seeds.with(profile::env::rng()))
}
fn fetch(
&mut self,
rid: RepoId,
from: NodeId,
timeout: time::Duration,
signed_references_minimum_feature_level: Option<FeatureLevel>,
) -> Result<FetchResult, Error> {
let result = self
.call(
Command::Fetch {
rid,
nid: from,
timeout,
signed_references_minimum_feature_level,
},
DEFAULT_TIMEOUT.max(timeout),
)?
.next()
.ok_or(Error::EmptyResponse)??;
Ok(result)
}
fn follow(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
let mut lines = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
let response = lines.next().ok_or(Error::EmptyResponse)??;
Ok(response.updated)
}
fn block(&mut self, nid: NodeId) -> Result<bool, Error> {
let mut lines = self.call::<Success>(Command::Block { nid }, DEFAULT_TIMEOUT)?;
let response = lines.next().ok_or(Error::EmptyResponse)??;
Ok(response.updated)
}
fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
let response = lines.next().ok_or(Error::EmptyResponse)??;
Ok(response.updated)
}
fn unfollow(&mut self, nid: NodeId) -> Result<bool, Error> {
let mut lines = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
let response = lines.next().ok_or(Error::EmptyResponse)??;
Ok(response.updated)
}
fn unseed(&mut self, rid: RepoId) -> Result<bool, Error> {
let mut lines = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
let response = lines.next().ok_or(Error::EmptyResponse)??;
Ok(response.updated)
}
fn announce_refs_for(
&mut self,
rid: RepoId,
namespaces: impl IntoIterator<Item = PublicKey>,
) -> Result<RefsAt, Error> {
let refs: RefsAt = self
.call(
Command::AnnounceRefsFor {
rid,
namespaces: HashSet::from_iter(namespaces),
},
DEFAULT_TIMEOUT,
)?
.next()
.ok_or(Error::EmptyResponse)??;
Ok(refs)
}
fn announce_inventory(&mut self) -> Result<(), Error> {
for line in self.call::<Success>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? {
line?;
}
Ok(())
}
fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
let mut lines = self.call::<Success>(Command::AddInventory { rid }, DEFAULT_TIMEOUT)?;
let response = lines.next().ok_or(Error::EmptyResponse)??;
Ok(response.updated)
}
fn subscribe(&self, timeout: time::Duration) -> Result<LineIter<Event>, Error> {
self.call(Command::Subscribe, timeout)
}
fn sessions(&self) -> Result<Self::Sessions, Error> {
let sessions = self
.call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)?
.next()
.ok_or(Error::EmptyResponse)??;
Ok(sessions)
}
fn session(&self, nid: NodeId) -> Result<Option<Session>, Error> {
let session = self
.call::<Option<Session>>(Command::Session { nid }, DEFAULT_TIMEOUT)?
.next()
.ok_or(Error::EmptyResponse)??;
Ok(session)
}
fn debug(&self) -> Result<json::Value, Self::Error> {
let debug = self
.call::<json::Value>(Command::Debug, DEFAULT_TIMEOUT)?
.next()
.ok_or(Error::EmptyResponse {})??;
Ok(debug)
}
fn shutdown(self) -> Result<(), Error> {
for line in self.call::<Success>(Command::Shutdown, DEFAULT_TIMEOUT)? {
line?;
}
while self.is_running() {
thread::sleep(time::Duration::from_secs(1));
}
Ok(())
}
}
pub trait AliasStore {
fn alias(&self, nid: &NodeId) -> Option<Alias>;
fn reverse_lookup(&self, alias: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>>;
}
impl AliasStore for HashMap<NodeId, Alias> {
fn alias(&self, nid: &NodeId) -> Option<Alias> {
self.get(nid).map(ToOwned::to_owned)
}
fn reverse_lookup(&self, needle: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>> {
self.iter()
.fold(BTreeMap::new(), |mut result, (node, alias)| {
if alias.contains(needle.as_str()) {
let nodes = result.entry(alias.clone()).or_default();
nodes.insert(*node);
}
result
})
}
}
#[cfg(test)]
pub(crate) mod properties {
use std::collections::BTreeSet;
use crate::node::{Alias, NodeId};
use crate::test::arbitrary;
use super::AliasStore;
pub struct AliasInput {
short: (Alias, BTreeSet<NodeId>),
long: (Alias, BTreeSet<NodeId>),
}
impl AliasInput {
pub fn new() -> Self {
let short = arbitrary::gen::<Alias>(0);
let long = {
let mut a = short.to_string();
a.push_str(arbitrary::gen::<Alias>(1).as_str());
Alias::new(a)
};
Self {
short: (short, arbitrary::vec::<NodeId>(3).into_iter().collect()),
long: (long, arbitrary::vec::<NodeId>(2).into_iter().collect()),
}
}
pub fn short(&self) -> &(Alias, BTreeSet<NodeId>) {
&self.short
}
pub fn long(&self) -> &(Alias, BTreeSet<NodeId>) {
&self.long
}
}
pub fn test_reverse_lookup(store: &impl AliasStore, AliasInput { short, long }: AliasInput) {
let (short, short_ids) = short;
let (long, long_ids) = long;
let first = store.reverse_lookup(&short);
assert_eq!(first.get(&short), Some(&short_ids),);
assert_eq!(first.get(&long), Some(&long_ids));
let second = store.reverse_lookup(&long);
assert_eq!(second.get(&short), None);
assert_eq!(second.get(&long), Some(&long_ids));
let mixed_case = Alias::new(
short
.as_str()
.chars()
.enumerate()
.map(|(i, c)| {
if i % 2 == 0 {
c.to_ascii_uppercase()
} else {
c.to_ascii_lowercase()
}
})
.collect::<String>(),
);
let upper = store.reverse_lookup(&mixed_case);
assert!(upper.contains_key(&short));
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
use super::*;
use crate::assert_matches;
#[test]
fn test_user_agent() {
assert!(UserAgent::from_str("/radicle:1.0.0/").is_ok());
assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/").is_ok());
assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/rust:1.77/").is_ok());
assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
assert!(UserAgent::from_str("/radicle:@a.b.c/").is_ok());
assert!(UserAgent::from_str("/radicle/").is_ok());
assert!(UserAgent::from_str("/rad/icle/").is_ok());
assert!(UserAgent::from_str("/rad:ic/le/").is_ok());
assert!(UserAgent::from_str("/:/").is_err());
assert!(UserAgent::from_str("//").is_err());
assert!(UserAgent::from_str("").is_err());
assert!(UserAgent::from_str("radicle:1.0.0/").is_err());
assert!(UserAgent::from_str("/radicle:1.0.0").is_err());
assert!(UserAgent::from_str("/radi cle:1.0/").is_err());
assert!(UserAgent::from_str("/radi\ncle:1.0/").is_err());
}
#[test]
fn test_alias() {
assert!(Alias::from_str("cloudhead").is_ok());
assert!(Alias::from_str("cloud-head").is_ok());
assert!(Alias::from_str("cl0ud.h3ad$__").is_ok());
assert!(Alias::from_str("©loudhèâd").is_ok());
assert!(Alias::from_str("").is_err());
assert!(Alias::from_str(" ").is_err());
assert!(Alias::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").is_err());
assert!(Alias::from_str("cloud\0head").is_err());
assert!(Alias::from_str("cloud head").is_err());
assert!(Alias::from_str("cloudhead\n").is_err());
}
#[test]
fn test_address() {
assert!(Address::from_str("127.0.0.1:8776").is_ok());
assert!(Address::from_str("[::1]:8776").is_ok());
assert!(Address::from_str("[::ffff:127.0.0.1]:8776").is_ok());
assert!(Address::from_str("localhost:8776").is_ok());
assert!(Address::from_str("::1:8776").is_ok());
assert!(Address::from_str("").is_err());
assert!(Address::from_str(":").is_err());
assert!(Address::from_str("127.0.0.1").is_err());
assert!(Address::from_str("127.0.0.1:xyz").is_err());
assert!(Address::from_str("[invalid]:8776").is_err());
assert!(Address::from_str("[127.0.0.1]:8776").is_err());
}
#[test]
fn test_command_result() {
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct Test {
value: u32,
}
assert_eq!(json::to_string(&CommandResult::Okay(true)).unwrap(), "true");
assert_eq!(
json::to_string(&CommandResult::Okay(Test { value: 42 })).unwrap(),
"{\"value\":42}"
);
assert_eq!(
json::from_str::<CommandResult<Test>>("{\"value\":42}").unwrap(),
CommandResult::Okay(Test { value: 42 })
);
assert_eq!(json::to_string(&CommandResult::ok()).unwrap(), "{}");
assert_eq!(
json::to_string(&CommandResult::updated(true)).unwrap(),
"{\"updated\":true}"
);
assert_eq!(
json::to_string(&CommandResult::error(io::Error::from(
io::ErrorKind::NotFound
)))
.unwrap(),
"{\"error\":\"entity not found\"}"
);
json::from_str::<CommandResult<State>>(
&serde_json::to_string(&CommandResult::Okay(State::Connected {
since: LocalTime::now(),
ping: Default::default(),
latencies: VecDeque::default(),
stable: false,
}))
.unwrap(),
)
.unwrap();
assert_matches!(
json::from_str::<CommandResult<State>>(
r#"{"connected":{"since":1699636852107,"fetching":[]}}"#
),
Ok(CommandResult::Okay(_))
);
assert_matches!(
json::from_str::<CommandResult<Seeds>>(
r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
),
Ok(CommandResult::Okay(_))
);
}
}