1#![allow(clippy::type_complexity)]
2#![allow(clippy::collapsible_if)]
3mod features;
4
5pub mod address;
6pub mod command;
7pub mod config;
8pub mod db;
9pub mod device;
10pub mod events;
11pub mod notifications;
12pub mod policy;
13pub mod refs;
14pub mod routing;
15pub mod seed;
16pub mod sync;
17pub mod timestamp;
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
20use std::fmt::Display;
21use std::io::{BufRead, BufReader};
22use std::marker::PhantomData;
23use std::net::IpAddr;
24use std::net::Ipv6Addr;
25use std::ops::{ControlFlow, Deref};
26use std::path::{Path, PathBuf};
27use std::str::FromStr;
28use std::{fmt, io, net, thread, time};
29
30#[cfg(unix)]
31use std::os::unix::net::UnixStream;
32#[cfg(windows)]
33use uds_windows::UnixStream;
34
35use amplify::WrapperMut;
36use cyphernet::addr::{AddrParseError, NetAddr};
37use localtime::{LocalDuration, LocalTime};
38use serde::de::DeserializeOwned;
39use serde::{Deserialize, Serialize};
40use serde_json as json;
41
42use crate::crypto::PublicKey;
43use crate::git;
44use crate::identity::RepoId;
45use crate::profile;
46use crate::storage::refs::{FeatureLevel, RefsAt};
47use crate::storage::RefUpdate;
48
49pub use address::KnownAddress;
50pub use command::{Command, CommandResult, ConnectOptions, Success, DEFAULT_TIMEOUT};
51pub use config::Config;
52pub use cyphernet::addr::{HostName, PeerAddr, PeerAddrParseError};
53pub use db::Database;
54pub use events::{Event, Events};
55pub use features::Features;
56pub use radicle_core::NodeId;
57pub use seed::SyncedAt;
58pub use timestamp::Timestamp;
59
60pub const PROTOCOL_VERSION: u8 = 1;
62pub const DEFAULT_PORT: u16 = 8776;
64pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
67pub const MAX_ALIAS_LENGTH: usize = 32;
69pub const PENALTY_CONNECT_THRESHOLD: u8 = 32;
71pub const PENALTY_BAN_THRESHOLD: u8 = 64;
73pub const NODE_DB_FILE: &str = "node.db";
75pub const POLICIES_DB_FILE: &str = "policies.db";
77pub const NOTIFICATIONS_DB_FILE: &str = "notifications.db";
79
80#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
81pub enum PingState {
82 #[default]
83 None,
85 AwaitingResponse {
87 len: u16,
89 since: LocalTime,
91 },
92 Ok,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97#[allow(clippy::large_enum_variant)]
98#[serde(rename_all = "camelCase")]
99#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
100pub enum State {
101 Initial,
103 Attempted,
105 #[serde(rename_all = "camelCase")]
107 Connected {
108 since: LocalTime,
110 #[serde(skip)]
112 ping: PingState,
113 #[serde(skip)]
115 latencies: VecDeque<LocalDuration>,
116 #[serde(skip)]
118 stable: bool,
119 },
120 #[serde(rename_all = "camelCase")]
122 Disconnected {
123 since: LocalTime,
125 retry_at: LocalTime,
127 },
128}
129
130impl State {
131 pub fn is_connected(&self) -> bool {
133 matches!(self, Self::Connected { .. })
134 }
135}
136
137impl fmt::Display for State {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 match self {
140 Self::Initial => {
141 write!(f, "initial")
142 }
143 Self::Attempted { .. } => {
144 write!(f, "attempted")
145 }
146 Self::Connected { .. } => {
147 write!(f, "connected")
148 }
149 Self::Disconnected { .. } => {
150 write!(f, "disconnected")
151 }
152 }
153 }
154}
155
156#[derive(Debug, Copy, Clone, PartialEq, Eq)]
158pub enum Severity {
159 Low = 0,
160 Medium = 1,
161 High = 8,
162}
163
164#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
166pub struct Penalty(u8);
167
168impl Penalty {
169 pub fn is_connect_threshold_reached(&self) -> bool {
172 self.0 >= PENALTY_CONNECT_THRESHOLD
173 }
174
175 pub fn is_ban_threshold_reached(&self) -> bool {
176 self.0 >= PENALTY_BAN_THRESHOLD
177 }
178}
179
180#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
182#[serde(tag = "status")]
183#[serde(rename_all = "camelCase")]
184#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
185pub enum SyncStatus {
186 #[serde(rename_all = "camelCase")]
188 Synced {
189 at: SyncedAt,
191 },
192 #[serde(rename_all = "camelCase")]
194 OutOfSync {
195 local: SyncedAt,
197 remote: SyncedAt,
199 },
200}
201
202impl Ord for SyncStatus {
203 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
204 match (self, other) {
205 (Self::Synced { at: left }, Self::Synced { at: right }) => left.cmp(right),
206 (Self::Synced { at }, Self::OutOfSync { remote, .. }) => at.cmp(remote),
207 (Self::OutOfSync { remote, .. }, Self::Synced { at }) => remote.cmp(at),
208 (Self::OutOfSync { remote: left, .. }, Self::OutOfSync { remote: right, .. }) => {
209 left.cmp(right)
210 }
211 }
212 }
213}
214
215impl PartialOrd for SyncStatus {
216 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
217 Some(self.cmp(other))
218 }
219}
220
221#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
223pub struct UserAgent(String);
224
225impl UserAgent {
226 pub fn as_str(&self) -> &str {
228 self.0.as_str()
229 }
230}
231
232impl Default for UserAgent {
233 fn default() -> Self {
234 UserAgent(String::from("/radicle/"))
235 }
236}
237
238impl std::fmt::Display for UserAgent {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 self.0.fmt(f)
241 }
242}
243
244impl FromStr for UserAgent {
245 type Err = String;
246
247 fn from_str(input: &str) -> Result<Self, Self::Err> {
248 let reserved = ['/', ':'];
249
250 if input.len() > 64 {
251 return Err(input.to_owned());
252 }
253 let Some(s) = input.strip_prefix('/') else {
254 return Err(input.to_owned());
255 };
256 let Some(s) = s.strip_suffix('/') else {
257 return Err(input.to_owned());
258 };
259 if s.is_empty() {
260 return Err(input.to_owned());
261 }
262 if s.split('/').all(|segment| {
263 if let Some((client, version)) = segment.split_once(':') {
264 if client.is_empty() || version.is_empty() {
265 false
266 } else {
267 let client = client
268 .chars()
269 .all(|c| c.is_ascii_graphic() && !reserved.contains(&c));
270 let version = version
271 .chars()
272 .all(|c| c.is_ascii_graphic() || !reserved.contains(&c));
273 client && version
274 }
275 } else {
276 true
277 }
278 }) {
279 Ok(Self(input.to_owned()))
280 } else {
281 Err(input.to_owned())
282 }
283 }
284}
285
286impl AsRef<str> for UserAgent {
287 fn as_ref(&self) -> &str {
288 self.0.as_str()
289 }
290}
291
292#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
294#[serde(try_from = "String", into = "String")]
295#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
296pub struct Alias(
297 #[cfg_attr(
306 feature = "schemars",
307 schemars(regex(pattern = r"^[^\x00-\x1F\x7F-\x9F\s]{0,32}$"), length(max = 32))
308 )]
309 String,
310);
311
312impl Alias {
313 pub fn new(alias: impl ToString) -> Self {
315 let alias = alias.to_string();
316
317 match Self::from_str(&alias) {
318 Ok(a) => a,
319 Err(e) => panic!("Alias::new: {e}"),
320 }
321 }
322
323 pub fn as_str(&self) -> &str {
325 self.0.as_str()
326 }
327}
328
329impl From<Alias> for String {
330 fn from(value: Alias) -> Self {
331 value.0
332 }
333}
334
335impl From<&NodeId> for Alias {
336 fn from(nid: &NodeId) -> Self {
337 Alias(nid.to_string())
338 }
339}
340
341impl fmt::Display for Alias {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 self.0.fmt(f)
344 }
345}
346
347impl Deref for Alias {
348 type Target = str;
349
350 fn deref(&self) -> &Self::Target {
351 &self.0
352 }
353}
354
355impl AsRef<str> for Alias {
356 fn as_ref(&self) -> &str {
357 self.0.as_str()
358 }
359}
360
361impl From<&Alias> for [u8; 32] {
362 fn from(input: &Alias) -> [u8; 32] {
363 let mut alias = [0u8; 32];
364
365 alias[..input.len()].copy_from_slice(input.as_bytes());
366 alias
367 }
368}
369
370#[derive(thiserror::Error, Debug)]
371pub enum AliasError {
372 #[error("alias cannot be empty")]
373 Empty,
374 #[error("alias cannot be greater than {MAX_ALIAS_LENGTH} bytes")]
375 MaxBytesExceeded,
376 #[error("alias cannot contain whitespace or control characters")]
377 InvalidCharacter,
378}
379
380impl FromStr for Alias {
381 type Err = AliasError;
382
383 fn from_str(s: &str) -> Result<Self, Self::Err> {
384 if s.is_empty() {
385 return Err(AliasError::Empty);
386 }
387 if s.chars().any(|c| c.is_control() || c.is_whitespace()) {
388 return Err(AliasError::InvalidCharacter);
389 }
390 if s.len() > MAX_ALIAS_LENGTH {
391 return Err(AliasError::MaxBytesExceeded);
392 }
393 Ok(Self(s.to_owned()))
394 }
395}
396
397impl TryFrom<String> for Alias {
398 type Error = AliasError;
399
400 fn try_from(value: String) -> Result<Self, Self::Error> {
401 Alias::from_str(&value)
402 }
403}
404
405impl TryFrom<&sqlite::Value> for Alias {
406 type Error = sqlite::Error;
407
408 fn try_from(value: &sqlite::Value) -> Result<Self, Self::Error> {
409 match value {
410 sqlite::Value::String(s) => Self::from_str(s).map_err(|e| sqlite::Error {
411 code: None,
412 message: Some(e.to_string()),
413 }),
414 _ => Err(sqlite::Error {
415 code: None,
416 message: Some(format!(
417 "sql: invalid type {:?} for alias, expected {:?}",
418 value.kind(),
419 sqlite::Type::String
420 )),
421 }),
422 }
423 }
424}
425
426#[derive(Clone, Eq, PartialEq, Debug, Hash, From, Wrapper, WrapperMut, Serialize, Deserialize)]
428#[wrapper(Deref)]
429#[wrapper_mut(DerefMut)]
430#[serde(try_from = "String", into = "String")]
431#[cfg_attr(
432 feature = "schemars",
433 derive(schemars::JsonSchema),
434 schemars(description = "\
435 An IP address, or a DNS name, or a Tor onion name, followed by the symbol ':', \
436 followed by a TCP port number.",
437 extend(
438 "examples" = [
439 "xmrhfasfg5suueegrnc4gsgyi2tyclcy5oz7f5drnrodmdtob6t2ioyd.onion:8776",
440 "seed.example.com:8776",
441 "192.0.2.0:31337",
442 ],
443 "pattern" = "^.+:((6553[0-5])|(655[0-2][0-9])|(65[0-4][0-9]{2})|(6[0-4][0-9]{3})|([1-5][0-9]{4})|([0-5]{0,5})|([0-9]{1,4}))$",
444 ),
445))]
446pub struct Address(NetAddr<HostName>);
447
448impl Address {
449 pub fn is_local(&self) -> bool {
451 match &self.0.host {
452 HostName::Ip(ip) => address::is_local(ip),
453 HostName::Dns(name) => {
454 let name = name.strip_suffix(".").unwrap_or(name);
455
456 name.ends_with(".localhost") || name == "localhost"
459 }
460 _ => false,
461 }
462 }
463
464 pub fn is_routable(&self) -> bool {
466 match self.0.host {
467 HostName::Ip(ip) => address::is_routable(&ip),
468 HostName::Dns(_) => !self.is_local(),
469 _ => true,
470 }
471 }
472
473 pub fn host(&self) -> &HostName {
475 &self.0.host
476 }
477
478 pub fn is_onion(&self) -> bool {
480 match self.0.host {
481 HostName::Tor(_) => true,
482 _ => false,
483 }
484 }
485
486 pub fn port(&self) -> u16 {
488 self.0.port
489 }
490
491 pub fn display_compact(&self) -> impl Display {
492 let host = match self.host() {
493 HostName::Ip(IpAddr::V4(ip)) => ip.to_string(),
494 HostName::Ip(IpAddr::V6(ip)) => format!("[{ip}]"),
495 HostName::Dns(dns) => dns.clone(),
496 HostName::Tor(onion) => {
497 let onion = onion.to_string();
498 let start = onion.chars().take(8).collect::<String>();
499 let end = onion
500 .chars()
501 .skip(onion.len() - 8 - ".onion".len())
502 .collect::<String>();
503 format!("{start}…{end}")
504 }
505 _ => unreachable!(),
506 };
507
508 let port = self.port().to_string();
509
510 format!("{host}:{port}")
511 }
512}
513
514impl Display for Address {
515 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516 match self.host() {
517 HostName::Ip(IpAddr::V6(ip)) => {
518 write!(f, "[{ip}]:{}", self.port())
519 }
520 _ => self.0.fmt(f),
521 }
522 }
523}
524
525impl TryFrom<String> for Address {
526 type Error = <Self as FromStr>::Err;
527
528 fn try_from(value: String) -> Result<Self, Self::Error> {
529 value.parse()
530 }
531}
532
533impl From<Address> for String {
534 fn from(value: Address) -> Self {
535 value.to_string()
536 }
537}
538
539impl FromStr for Address {
540 type Err = AddrParseError;
541
542 fn from_str(s: &str) -> Result<Self, Self::Err> {
543 let (host, port) = s.rsplit_once(':').ok_or(AddrParseError::PortAbsent)?;
544
545 let host = if let Some(host) = host
546 .strip_prefix('[')
547 .and_then(|host| host.strip_suffix(']'))
548 {
549 HostName::Ip(host.parse::<Ipv6Addr>()?.into())
550 } else {
551 host.parse().map(|host| match host {
553 HostName::Ip(IpAddr::V6(addr)) => {
554 log::warn!("Address format will change in the future. '{s}' should be changed to '[{addr}]:{port}' to stay compatible. Refer to RFC 5926, Sec. 6 as well as RFC 3986, Sec. D.1. and RFC 2732, Sec. 2.");
555 host
556 },
557 host => host,
558 })?
559 };
560
561 let port = port.parse().map_err(|_| AddrParseError::InvalidPort)?;
562
563 Ok(Self(NetAddr::new(host, port)))
564 }
565}
566
567impl cyphernet::addr::Host for Address {
568 fn requires_proxy(&self) -> bool {
569 self.0.requires_proxy()
570 }
571}
572
573impl cyphernet::addr::Addr for Address {
574 fn port(&self) -> u16 {
575 self.0.port()
576 }
577}
578
579impl From<net::SocketAddr> for Address {
580 fn from(addr: net::SocketAddr) -> Self {
581 Address(NetAddr {
582 host: HostName::Ip(addr.ip()),
583 port: addr.port(),
584 })
585 }
586}
587
588impl From<Address> for HostName {
589 fn from(addr: Address) -> Self {
590 addr.0.host
591 }
592}
593
594#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
596#[serde(rename_all = "camelCase")]
597#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
598pub enum Link {
599 Outbound,
601 Inbound,
603}
604
605impl Link {
606 pub fn is_outbound(&self) -> bool {
608 matches!(self, Self::Outbound)
609 }
610
611 pub fn is_inbound(&self) -> bool {
613 matches!(self, Self::Inbound)
614 }
615}
616
617impl std::fmt::Display for Link {
618 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619 match self {
620 Link::Outbound => write!(f, "outbound"),
621 Link::Inbound => write!(f, "inbound"),
622 }
623 }
624}
625
626#[derive(Debug, Clone, Serialize, Deserialize)]
628#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
629pub struct Session {
630 pub nid: NodeId,
631 pub link: Link,
632 pub addr: Address,
633 pub state: State,
634}
635
636impl Session {
637 pub fn is_connected(&self) -> bool {
639 self.state.is_connected()
640 }
641}
642
643#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
645#[serde(rename_all = "camelCase")]
646#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
647pub struct Seed {
648 pub nid: NodeId,
650 pub addrs: Vec<KnownAddress>,
652 #[serde(default, skip_serializing_if = "Option::is_none")]
654 pub state: Option<State>,
655 #[serde(default, skip_serializing_if = "Option::is_none")]
657 pub sync: Option<SyncStatus>,
658}
659
660impl Seed {
661 pub fn is_connected(&self) -> bool {
663 matches!(self.state, Some(State::Connected { .. }))
664 }
665
666 pub fn is_synced(&self) -> bool {
668 matches!(self.sync, Some(SyncStatus::Synced { .. }))
669 }
670
671 pub fn new(
672 nid: NodeId,
673 addrs: Vec<KnownAddress>,
674 state: Option<State>,
675 sync: Option<SyncStatus>,
676 ) -> Self {
677 Self {
678 nid,
679 addrs,
680 state,
681 sync,
682 }
683 }
684}
685
686#[derive(Clone, Debug, Serialize, Deserialize)]
687#[serde(into = "Vec<Seed>", from = "Vec<Seed>")]
690#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
691pub struct Seeds(
692 #[cfg_attr(feature = "schemars", schemars(with = "Vec<Seed>"))]
693 address::AddressBook<NodeId, Seed>,
694);
695
696impl Seeds {
697 pub fn new(rng: fastrand::Rng) -> Self {
699 Self(address::AddressBook::new(rng))
700 }
701
702 pub fn insert(&mut self, seed: Seed) {
704 self.0.insert(seed.nid, seed);
705 }
706
707 pub fn contains(&self, nid: &NodeId) -> bool {
709 self.0.contains_key(nid)
710 }
711
712 pub fn len(&self) -> usize {
714 self.0.len()
715 }
716
717 pub fn is_empty(&self) -> bool {
719 self.0.is_empty()
720 }
721
722 pub fn partition(&self) -> (Vec<Seed>, Vec<Seed>) {
725 self.iter().cloned().partition(|s| s.is_connected())
726 }
727
728 pub fn connected(&self) -> impl Iterator<Item = &Seed> {
730 self.iter().filter(|s| s.is_connected())
731 }
732
733 pub fn iter(&self) -> impl Iterator<Item = &Seed> {
735 self.0.shuffled().map(|(_, v)| v)
736 }
737
738 pub fn is_connected(&self, nid: &NodeId) -> bool {
740 self.0.get(nid).is_some_and(|s| s.is_connected())
741 }
742
743 pub fn with(self, rng: fastrand::Rng) -> Self {
745 Self(self.0.with(rng))
746 }
747}
748
749impl From<Seeds> for Vec<Seed> {
750 fn from(seeds: Seeds) -> Vec<Seed> {
751 seeds.0.into_shuffled().map(|(_, v)| v).collect()
752 }
753}
754
755impl From<Vec<Seed>> for Seeds {
756 fn from(other: Vec<Seed>) -> Seeds {
757 Seeds(address::AddressBook::from_iter(
758 other.into_iter().map(|s| (s.nid, s)),
759 ))
760 }
761}
762
763#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
764#[serde(tag = "status", rename_all = "camelCase")]
765#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
766pub enum FetchResult {
767 Success {
768 updated: Vec<RefUpdate>,
769 namespaces: HashSet<NodeId>,
770 clone: bool,
771 },
772 Failed {
774 reason: String,
775 },
776}
777
778impl FetchResult {
779 pub fn is_success(&self) -> bool {
780 matches!(self, FetchResult::Success { .. })
781 }
782
783 pub fn success(self) -> Option<(Vec<RefUpdate>, HashSet<NodeId>)> {
784 match self {
785 Self::Success {
786 updated,
787 namespaces,
788 ..
789 } => Some((updated, namespaces)),
790 _ => None,
791 }
792 }
793
794 pub fn find_updated(&self, name: &git::fmt::RefStr) -> Option<RefUpdate> {
795 let updated = match self {
796 Self::Success { updated, .. } => Some(updated),
797 _ => None,
798 }?;
799 updated.iter().find(|up| up.name() == name).cloned()
800 }
801}
802
803impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>> for FetchResult {
804 fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>) -> Self {
805 match value {
806 Ok((updated, namespaces, clone)) => Self::Success {
807 updated,
808 namespaces,
809 clone,
810 },
811 Err(err) => Self::Failed {
812 reason: err.to_string(),
813 },
814 }
815 }
816}
817
818#[derive(Clone, Debug, Default)]
820pub struct FetchResults(Vec<(NodeId, FetchResult)>);
821
822impl FetchResults {
823 pub fn push(&mut self, nid: NodeId, result: FetchResult) {
825 self.0.push((nid, result));
826 }
827
828 pub fn contains(&self, nid: &NodeId) -> bool {
830 self.0.iter().any(|(n, _)| n == nid)
831 }
832
833 pub fn get(&self, nid: &NodeId) -> Option<&FetchResult> {
835 self.0.iter().find(|(n, _)| n == nid).map(|(_, r)| r)
836 }
837
838 pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &FetchResult)> {
840 self.0.iter().map(|(nid, r)| (nid, r))
841 }
842
843 pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate], HashSet<NodeId>)> {
845 self.0.iter().filter_map(|(nid, r)| {
846 if let FetchResult::Success {
847 updated,
848 namespaces,
849 ..
850 } = r
851 {
852 Some((nid, updated.as_slice(), namespaces.clone()))
853 } else {
854 None
855 }
856 })
857 }
858
859 pub fn failed(&self) -> impl Iterator<Item = (&NodeId, &str)> {
861 self.0.iter().filter_map(|(nid, r)| {
862 if let FetchResult::Failed { reason } = r {
863 Some((nid, reason.as_str()))
864 } else {
865 None
866 }
867 })
868 }
869}
870
871impl From<Vec<(NodeId, FetchResult)>> for FetchResults {
872 fn from(value: Vec<(NodeId, FetchResult)>) -> Self {
873 Self(value)
874 }
875}
876
877impl Deref for FetchResults {
878 type Target = [(NodeId, FetchResult)];
879
880 fn deref(&self) -> &Self::Target {
881 self.0.as_slice()
882 }
883}
884
885impl IntoIterator for FetchResults {
886 type Item = (NodeId, FetchResult);
887 type IntoIter = std::vec::IntoIter<(NodeId, FetchResult)>;
888
889 fn into_iter(self) -> Self::IntoIter {
890 self.0.into_iter()
891 }
892}
893
894#[derive(thiserror::Error, Debug)]
896pub enum Error {
897 #[error("i/o: {0}")]
898 Io(#[from] io::Error),
899 #[error("node: {0}")]
900 Node(String),
901 #[error("timed out reading from control socket")]
902 TimedOut,
903 #[error("failed to open node control socket {0:?} ({1})")]
904 Connect(PathBuf, io::ErrorKind),
905 #[error("command error: {reason}")]
906 Command { reason: String },
907 #[error("received invalid json `{response}` in response to command: {error}")]
908 InvalidJson {
909 response: String,
910 error: json::Error,
911 },
912 #[error("received empty response for command")]
913 EmptyResponse,
914}
915
916impl Error {
917 pub fn is_connection_err(&self) -> bool {
919 matches!(self, Self::Connect { .. })
920 }
921}
922
923#[derive(Debug, Serialize, Deserialize)]
924#[serde(rename_all = "camelCase", tag = "status")]
925#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
926pub enum ConnectResult {
927 Connected,
928 Disconnected { reason: String },
929}
930
931pub trait Handle: Clone + Sync + Send {
933 type Sessions;
935 type Events: IntoIterator<Item = Self::Event>;
936 type Event;
937 type Error: std::error::Error + Send + Sync + 'static;
939
940 fn nid(&self) -> Result<NodeId, Self::Error>;
942 fn is_running(&self) -> bool;
944 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error>;
946 fn config(&self) -> Result<config::Config, Self::Error>;
948 fn connect(
950 &mut self,
951 node: NodeId,
952 addr: Address,
953 opts: ConnectOptions,
954 ) -> Result<ConnectResult, Self::Error>;
955 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error>;
957
958 #[deprecated(note = "use `seeds_for` instead")]
960 fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
961 self.seeds_for(id, [self.nid()?])
962 }
963
964 fn seeds_for(
967 &mut self,
968 id: RepoId,
969 namespaces: impl IntoIterator<Item = PublicKey>,
970 ) -> Result<Seeds, Self::Error>;
971
972 fn fetch(
974 &mut self,
975 id: RepoId,
976 from: NodeId,
977 timeout: time::Duration,
978 signed_references_minimum_feature_level: Option<FeatureLevel>,
979 ) -> Result<FetchResult, Self::Error>;
980 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Self::Error>;
983 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>;
985 fn block(&mut self, id: NodeId) -> Result<bool, Self::Error>;
987 fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
989 fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error>;
991
992 #[deprecated(note = "use `announce_refs_for` instead")]
995 fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
996 self.announce_refs_for(id, [self.nid()?])
997 }
998
999 fn announce_refs_for(
1002 &mut self,
1003 id: RepoId,
1004 namespaces: impl IntoIterator<Item = PublicKey>,
1005 ) -> Result<RefsAt, Self::Error>;
1006
1007 fn announce_inventory(&mut self) -> Result<(), Self::Error>;
1009 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Self::Error>;
1011 fn shutdown(self) -> Result<(), Self::Error>;
1013 fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
1015 fn session(&self, node: NodeId) -> Result<Option<Session>, Self::Error>;
1017 fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
1019 fn debug(&self) -> Result<json::Value, Self::Error>;
1021}
1022
1023pub struct LineIter<T> {
1028 stream: BufReader<UnixStream>,
1029 timeout: time::Duration,
1030 witness: PhantomData<T>,
1031}
1032
1033impl<T: DeserializeOwned> Iterator for LineIter<T> {
1034 type Item = Result<T, Error>;
1035
1036 fn next(&mut self) -> Option<Self::Item> {
1037 let mut l = String::new();
1038
1039 self.stream
1040 .get_ref()
1041 .set_read_timeout(Some(self.timeout))
1042 .ok();
1043
1044 match self.stream.read_line(&mut l) {
1045 Ok(0) => None,
1046 Ok(_) => {
1047 let result: CommandResult<T> = match json::from_str(&l) {
1048 Err(e) => {
1049 return Some(Err(Error::InvalidJson {
1050 response: l.clone(),
1051 error: e,
1052 }))
1053 }
1054 Ok(result) => result,
1055 };
1056 match result {
1057 CommandResult::Okay(result) => Some(Ok(result)),
1058 CommandResult::Error { reason } => Some(Err(Error::Command { reason })),
1059 }
1060 }
1061 Err(e) => match e.kind() {
1062 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Some(Err(Error::TimedOut)),
1063 _ => Some(Err(Error::Io(e))),
1064 },
1065 }
1066 }
1067}
1068
1069#[derive(Debug, Clone)]
1071pub struct Node {
1072 socket: PathBuf,
1073}
1074
1075impl Node {
1076 pub fn new<P: AsRef<Path>>(path: P) -> Self {
1078 Self {
1079 socket: path.as_ref().to_path_buf(),
1080 }
1081 }
1082
1083 pub fn call<T: DeserializeOwned + Send + 'static>(
1085 &self,
1086 cmd: Command,
1087 timeout: time::Duration,
1088 ) -> Result<LineIter<T>, Error> {
1089 let mut stream = UnixStream::connect(&self.socket)
1090 .map_err(|e| Error::Connect(self.socket.clone(), e.kind()))?;
1091 cmd.to_writer(&mut stream)?;
1092 Ok(LineIter {
1093 stream: BufReader::new(stream),
1094 timeout,
1095 witness: PhantomData,
1096 })
1097 }
1098
1099 pub fn announce(
1103 &mut self,
1104 rid: RepoId,
1105 namespaces: impl IntoIterator<Item = PublicKey>,
1106 timeout: time::Duration,
1107 mut announcer: sync::Announcer,
1108 mut report: impl FnMut(&NodeId, sync::announce::Progress),
1109 ) -> Result<sync::AnnouncerResult, Error> {
1110 let mut events = self.subscribe(timeout)?;
1111 let refs = self.announce_refs_for(rid, namespaces)?;
1112
1113 let started = time::Instant::now();
1114
1115 loop {
1116 let Some(e) = events.next() else {
1117 return Ok(announcer.timed_out());
1120 };
1121 let elapsed = started.elapsed();
1122 if elapsed >= timeout {
1123 return Ok(announcer.timed_out());
1124 }
1125 match e {
1126 Ok(Event::RefsSynced {
1127 remote,
1128 rid: rid_,
1129 at,
1130 }) if rid == rid_ && refs.at == at => {
1131 log::debug!(target: "radicle", "Received {e:?}");
1132 match announcer.synced_with(remote, elapsed) {
1133 ControlFlow::Continue(progress) => {
1134 report(&remote, progress);
1135 }
1136 ControlFlow::Break(finished) => {
1137 return Ok(finished.into());
1138 }
1139 }
1140 }
1141 Ok(_) => {}
1142
1143 Err(Error::TimedOut) => {
1144 return Ok(announcer.timed_out());
1145 }
1146 Err(e) => return Err(e),
1147 }
1148 announcer = match announcer.can_continue() {
1151 ControlFlow::Continue(cont) => cont,
1152 ControlFlow::Break(finished) => return Ok(finished.into()),
1153 };
1154 }
1155 }
1156}
1157
1158impl Handle for Node {
1161 type Sessions = Vec<Session>;
1162 type Events = LineIter<Event>;
1163 type Event = Result<Event, Error>;
1164 type Error = Error;
1165
1166 fn nid(&self) -> Result<NodeId, Error> {
1167 self.call::<NodeId>(Command::NodeId, DEFAULT_TIMEOUT)?
1168 .next()
1169 .ok_or(Error::EmptyResponse)?
1170 }
1171
1172 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Error> {
1173 self.call::<Vec<net::SocketAddr>>(Command::ListenAddrs, DEFAULT_TIMEOUT)?
1174 .next()
1175 .ok_or(Error::EmptyResponse)?
1176 }
1177
1178 fn is_running(&self) -> bool {
1179 let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
1180 return false;
1181 };
1182
1183 let Some(Ok(_)) = lines.next() else {
1184 return false;
1185 };
1186 true
1187 }
1188
1189 fn config(&self) -> Result<config::Config, Error> {
1190 self.call::<config::Config>(Command::Config, DEFAULT_TIMEOUT)?
1191 .next()
1192 .ok_or(Error::EmptyResponse)?
1193 }
1194
1195 fn connect(
1196 &mut self,
1197 nid: NodeId,
1198 addr: Address,
1199 opts: ConnectOptions,
1200 ) -> Result<ConnectResult, Error> {
1201 let timeout = opts.timeout;
1202 let result = self
1203 .call::<ConnectResult>(
1204 Command::Connect {
1205 addr: (nid, addr).into(),
1206 opts,
1207 },
1208 timeout,
1209 )?
1210 .next()
1211 .ok_or(Error::EmptyResponse)??;
1212
1213 Ok(result)
1214 }
1215
1216 fn disconnect(&mut self, nid: NodeId) -> Result<(), Self::Error> {
1217 self.call::<ConnectResult>(Command::Disconnect { nid }, DEFAULT_TIMEOUT)?
1218 .next()
1219 .ok_or(Error::EmptyResponse)??;
1220
1221 Ok(())
1222 }
1223
1224 fn seeds_for(
1225 &mut self,
1226 rid: RepoId,
1227 namespaces: impl IntoIterator<Item = PublicKey>,
1228 ) -> Result<Seeds, Error> {
1229 let seeds = self
1230 .call::<Seeds>(
1231 Command::SeedsFor {
1232 rid,
1233 namespaces: HashSet::from_iter(namespaces),
1234 },
1235 DEFAULT_TIMEOUT,
1236 )?
1237 .next()
1238 .ok_or(Error::EmptyResponse)??;
1239
1240 Ok(seeds.with(profile::env::rng()))
1241 }
1242
1243 fn fetch(
1244 &mut self,
1245 rid: RepoId,
1246 from: NodeId,
1247 timeout: time::Duration,
1248 signed_references_minimum_feature_level: Option<FeatureLevel>,
1249 ) -> Result<FetchResult, Error> {
1250 let result = self
1251 .call(
1252 Command::Fetch {
1253 rid,
1254 nid: from,
1255 timeout,
1256 signed_references_minimum_feature_level,
1257 },
1258 DEFAULT_TIMEOUT.max(timeout),
1259 )?
1260 .next()
1261 .ok_or(Error::EmptyResponse)??;
1262
1263 Ok(result)
1264 }
1265
1266 fn follow(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
1267 let mut lines = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
1268 let response = lines.next().ok_or(Error::EmptyResponse)??;
1269
1270 Ok(response.updated)
1271 }
1272
1273 fn block(&mut self, nid: NodeId) -> Result<bool, Error> {
1274 let mut lines = self.call::<Success>(Command::Block { nid }, DEFAULT_TIMEOUT)?;
1275 let response = lines.next().ok_or(Error::EmptyResponse)??;
1276
1277 Ok(response.updated)
1278 }
1279
1280 fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
1281 let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
1282 let response = lines.next().ok_or(Error::EmptyResponse)??;
1283
1284 Ok(response.updated)
1285 }
1286
1287 fn unfollow(&mut self, nid: NodeId) -> Result<bool, Error> {
1288 let mut lines = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
1289 let response = lines.next().ok_or(Error::EmptyResponse)??;
1290
1291 Ok(response.updated)
1292 }
1293
1294 fn unseed(&mut self, rid: RepoId) -> Result<bool, Error> {
1295 let mut lines = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
1296 let response = lines.next().ok_or(Error::EmptyResponse)??;
1297
1298 Ok(response.updated)
1299 }
1300
1301 fn announce_refs_for(
1302 &mut self,
1303 rid: RepoId,
1304 namespaces: impl IntoIterator<Item = PublicKey>,
1305 ) -> Result<RefsAt, Error> {
1306 let refs: RefsAt = self
1307 .call(
1308 Command::AnnounceRefsFor {
1309 rid,
1310 namespaces: HashSet::from_iter(namespaces),
1311 },
1312 DEFAULT_TIMEOUT,
1313 )?
1314 .next()
1315 .ok_or(Error::EmptyResponse)??;
1316
1317 Ok(refs)
1318 }
1319
1320 fn announce_inventory(&mut self) -> Result<(), Error> {
1321 for line in self.call::<Success>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? {
1322 line?;
1323 }
1324 Ok(())
1325 }
1326
1327 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1328 let mut lines = self.call::<Success>(Command::AddInventory { rid }, DEFAULT_TIMEOUT)?;
1329 let response = lines.next().ok_or(Error::EmptyResponse)??;
1330
1331 Ok(response.updated)
1332 }
1333
1334 fn subscribe(&self, timeout: time::Duration) -> Result<LineIter<Event>, Error> {
1335 self.call(Command::Subscribe, timeout)
1336 }
1337
1338 fn sessions(&self) -> Result<Self::Sessions, Error> {
1339 let sessions = self
1340 .call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)?
1341 .next()
1342 .ok_or(Error::EmptyResponse)??;
1343
1344 Ok(sessions)
1345 }
1346
1347 fn session(&self, nid: NodeId) -> Result<Option<Session>, Error> {
1348 let session = self
1349 .call::<Option<Session>>(Command::Session { nid }, DEFAULT_TIMEOUT)?
1350 .next()
1351 .ok_or(Error::EmptyResponse)??;
1352
1353 Ok(session)
1354 }
1355
1356 fn debug(&self) -> Result<json::Value, Self::Error> {
1357 let debug = self
1358 .call::<json::Value>(Command::Debug, DEFAULT_TIMEOUT)?
1359 .next()
1360 .ok_or(Error::EmptyResponse {})??;
1361
1362 Ok(debug)
1363 }
1364
1365 fn shutdown(self) -> Result<(), Error> {
1366 for line in self.call::<Success>(Command::Shutdown, DEFAULT_TIMEOUT)? {
1367 line?;
1368 }
1369 while self.is_running() {
1371 thread::sleep(time::Duration::from_secs(1));
1372 }
1373 Ok(())
1374 }
1375}
1376
1377pub trait AliasStore {
1379 fn alias(&self, nid: &NodeId) -> Option<Alias>;
1381
1382 fn reverse_lookup(&self, alias: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>>;
1388}
1389
1390impl AliasStore for HashMap<NodeId, Alias> {
1391 fn alias(&self, nid: &NodeId) -> Option<Alias> {
1392 self.get(nid).map(ToOwned::to_owned)
1393 }
1394
1395 fn reverse_lookup(&self, needle: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>> {
1396 self.iter()
1397 .fold(BTreeMap::new(), |mut result, (node, alias)| {
1398 if alias.contains(needle.as_str()) {
1399 let nodes = result.entry(alias.clone()).or_default();
1400 nodes.insert(*node);
1401 }
1402 result
1403 })
1404 }
1405}
1406
1407#[cfg(test)]
1408pub(crate) mod properties {
1409 use std::collections::BTreeSet;
1410
1411 use crate::node::{Alias, NodeId};
1412 use crate::test::arbitrary;
1413
1414 use super::AliasStore;
1415
1416 pub struct AliasInput {
1417 short: (Alias, BTreeSet<NodeId>),
1418 long: (Alias, BTreeSet<NodeId>),
1419 }
1420
1421 impl AliasInput {
1422 pub fn new() -> Self {
1423 let short = arbitrary::gen::<Alias>(0);
1424 let long = {
1425 let mut a = short.to_string();
1427 a.push_str(arbitrary::gen::<Alias>(1).as_str());
1428 Alias::new(a)
1429 };
1430 Self {
1431 short: (short, arbitrary::vec::<NodeId>(3).into_iter().collect()),
1432 long: (long, arbitrary::vec::<NodeId>(2).into_iter().collect()),
1433 }
1434 }
1435
1436 pub fn short(&self) -> &(Alias, BTreeSet<NodeId>) {
1437 &self.short
1438 }
1439
1440 pub fn long(&self) -> &(Alias, BTreeSet<NodeId>) {
1441 &self.long
1442 }
1443 }
1444
1445 pub fn test_reverse_lookup(store: &impl AliasStore, AliasInput { short, long }: AliasInput) {
1454 let (short, short_ids) = short;
1455 let (long, long_ids) = long;
1456 let first = store.reverse_lookup(&short);
1457 assert_eq!(first.get(&short), Some(&short_ids),);
1459 assert_eq!(first.get(&long), Some(&long_ids));
1461
1462 let second = store.reverse_lookup(&long);
1463 assert_eq!(second.get(&short), None);
1465 assert_eq!(second.get(&long), Some(&long_ids));
1466
1467 let mixed_case = Alias::new(
1468 short
1469 .as_str()
1470 .chars()
1471 .enumerate()
1472 .map(|(i, c)| {
1473 if i % 2 == 0 {
1474 c.to_ascii_uppercase()
1475 } else {
1476 c.to_ascii_lowercase()
1477 }
1478 })
1479 .collect::<String>(),
1480 );
1481 let upper = store.reverse_lookup(&mixed_case);
1482 assert!(upper.contains_key(&short));
1483 }
1484}
1485
1486#[cfg(test)]
1487#[allow(clippy::unwrap_used)]
1488mod test {
1489 use super::*;
1490 use crate::assert_matches;
1491
1492 #[test]
1493 fn test_user_agent() {
1494 assert!(UserAgent::from_str("/radicle:1.0.0/").is_ok());
1495 assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/").is_ok());
1496 assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/rust:1.77/").is_ok());
1497 assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
1498 assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
1499 assert!(UserAgent::from_str("/radicle:@a.b.c/").is_ok());
1500 assert!(UserAgent::from_str("/radicle/").is_ok());
1501 assert!(UserAgent::from_str("/rad/icle/").is_ok());
1502 assert!(UserAgent::from_str("/rad:ic/le/").is_ok());
1503
1504 assert!(UserAgent::from_str("/:/").is_err());
1505 assert!(UserAgent::from_str("//").is_err());
1506 assert!(UserAgent::from_str("").is_err());
1507 assert!(UserAgent::from_str("radicle:1.0.0/").is_err());
1508 assert!(UserAgent::from_str("/radicle:1.0.0").is_err());
1509 assert!(UserAgent::from_str("/radi cle:1.0/").is_err());
1510 assert!(UserAgent::from_str("/radi\ncle:1.0/").is_err());
1511 }
1512
1513 #[test]
1514 fn test_alias() {
1515 assert!(Alias::from_str("cloudhead").is_ok());
1516 assert!(Alias::from_str("cloud-head").is_ok());
1517 assert!(Alias::from_str("cl0ud.h3ad$__").is_ok());
1518 assert!(Alias::from_str("©loudhèâd").is_ok());
1519
1520 assert!(Alias::from_str("").is_err());
1521 assert!(Alias::from_str(" ").is_err());
1522 assert!(Alias::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").is_err());
1523 assert!(Alias::from_str("cloud\0head").is_err());
1524 assert!(Alias::from_str("cloud head").is_err());
1525 assert!(Alias::from_str("cloudhead\n").is_err());
1526 }
1527
1528 #[test]
1529 fn test_address() {
1530 assert!(Address::from_str("127.0.0.1:8776").is_ok());
1531 assert!(Address::from_str("[::1]:8776").is_ok());
1532 assert!(Address::from_str("[::ffff:127.0.0.1]:8776").is_ok());
1533 assert!(Address::from_str("localhost:8776").is_ok());
1534 assert!(Address::from_str("::1:8776").is_ok()); assert!(Address::from_str("").is_err());
1537 assert!(Address::from_str(":").is_err());
1538 assert!(Address::from_str("127.0.0.1").is_err());
1539 assert!(Address::from_str("127.0.0.1:xyz").is_err());
1540 assert!(Address::from_str("[invalid]:8776").is_err());
1541 assert!(Address::from_str("[127.0.0.1]:8776").is_err());
1542 }
1543
1544 #[test]
1545 fn test_command_result() {
1546 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
1547 struct Test {
1548 value: u32,
1549 }
1550
1551 assert_eq!(json::to_string(&CommandResult::Okay(true)).unwrap(), "true");
1552 assert_eq!(
1553 json::to_string(&CommandResult::Okay(Test { value: 42 })).unwrap(),
1554 "{\"value\":42}"
1555 );
1556 assert_eq!(
1557 json::from_str::<CommandResult<Test>>("{\"value\":42}").unwrap(),
1558 CommandResult::Okay(Test { value: 42 })
1559 );
1560 assert_eq!(json::to_string(&CommandResult::ok()).unwrap(), "{}");
1561 assert_eq!(
1562 json::to_string(&CommandResult::updated(true)).unwrap(),
1563 "{\"updated\":true}"
1564 );
1565 assert_eq!(
1566 json::to_string(&CommandResult::error(io::Error::from(
1567 io::ErrorKind::NotFound
1568 )))
1569 .unwrap(),
1570 "{\"error\":\"entity not found\"}"
1571 );
1572
1573 json::from_str::<CommandResult<State>>(
1574 &serde_json::to_string(&CommandResult::Okay(State::Connected {
1575 since: LocalTime::now(),
1576 ping: Default::default(),
1577 latencies: VecDeque::default(),
1578 stable: false,
1579 }))
1580 .unwrap(),
1581 )
1582 .unwrap();
1583
1584 assert_matches!(
1585 json::from_str::<CommandResult<State>>(
1586 r#"{"connected":{"since":1699636852107,"fetching":[]}}"#
1587 ),
1588 Ok(CommandResult::Okay(_))
1589 );
1590 assert_matches!(
1591 json::from_str::<CommandResult<Seeds>>(
1592 r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
1593 ),
1594 Ok(CommandResult::Okay(_))
1595 );
1596 }
1597}