1#![allow(clippy::type_complexity)]
2#![allow(clippy::collapsible_if)]
3mod features;
4
5pub mod address;
6pub mod command;
7pub mod config;
8pub mod db;
9pub mod device;
10pub mod events;
11pub mod notifications;
12pub mod policy;
13pub mod refs;
14pub mod routing;
15pub mod seed;
16pub mod sync;
17pub mod timestamp;
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
20use std::io::{BufRead, BufReader};
21use std::marker::PhantomData;
22use std::ops::{ControlFlow, Deref};
23use std::path::{Path, PathBuf};
24use std::str::FromStr;
25use std::{fmt, io, net, thread, time};
26
27#[cfg(unix)]
28use std::os::unix::net::UnixStream;
29#[cfg(windows)]
30use uds_windows::UnixStream;
31
32use amplify::WrapperMut;
33use cyphernet::addr::NetAddr;
34use localtime::{LocalDuration, LocalTime};
35use serde::de::DeserializeOwned;
36use serde::{Deserialize, Serialize};
37use serde_json as json;
38
39use crate::crypto::PublicKey;
40use crate::git;
41use crate::identity::RepoId;
42use crate::profile;
43use crate::storage::refs::RefsAt;
44use crate::storage::RefUpdate;
45
46pub use address::KnownAddress;
47pub use command::{Command, CommandResult, ConnectOptions, Success, DEFAULT_TIMEOUT};
48pub use config::Config;
49pub use cyphernet::addr::{HostName, PeerAddr, PeerAddrParseError};
50pub use db::Database;
51pub use events::{Event, Events};
52pub use features::Features;
53pub use radicle_core::NodeId;
54pub use seed::SyncedAt;
55pub use timestamp::Timestamp;
56
57pub const PROTOCOL_VERSION: u8 = 1;
59pub const DEFAULT_PORT: u16 = 8776;
61pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
64pub const MAX_ALIAS_LENGTH: usize = 32;
66pub const PENALTY_CONNECT_THRESHOLD: u8 = 32;
68pub const PENALTY_BAN_THRESHOLD: u8 = 64;
70pub const NODE_DB_FILE: &str = "node.db";
72pub const POLICIES_DB_FILE: &str = "policies.db";
74pub const NOTIFICATIONS_DB_FILE: &str = "notifications.db";
76
77#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
78pub enum PingState {
79 #[default]
80 None,
82 AwaitingResponse {
84 len: u16,
86 since: LocalTime,
88 },
89 Ok,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
94#[allow(clippy::large_enum_variant)]
95#[serde(rename_all = "camelCase")]
96#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
97pub enum State {
98 Initial,
100 Attempted,
102 #[serde(rename_all = "camelCase")]
104 Connected {
105 since: LocalTime,
107 #[serde(skip)]
109 ping: PingState,
110 #[serde(skip)]
112 latencies: VecDeque<LocalDuration>,
113 #[serde(skip)]
115 stable: bool,
116 },
117 #[serde(rename_all = "camelCase")]
119 Disconnected {
120 since: LocalTime,
122 retry_at: LocalTime,
124 },
125}
126
127impl State {
128 pub fn is_connected(&self) -> bool {
130 matches!(self, Self::Connected { .. })
131 }
132}
133
134impl fmt::Display for State {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 match self {
137 Self::Initial => {
138 write!(f, "initial")
139 }
140 Self::Attempted { .. } => {
141 write!(f, "attempted")
142 }
143 Self::Connected { .. } => {
144 write!(f, "connected")
145 }
146 Self::Disconnected { .. } => {
147 write!(f, "disconnected")
148 }
149 }
150 }
151}
152
153#[derive(Debug, Copy, Clone, PartialEq, Eq)]
155pub enum Severity {
156 Low = 0,
157 Medium = 1,
158 High = 8,
159}
160
161#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
163pub struct Penalty(u8);
164
165impl Penalty {
166 pub fn is_connect_threshold_reached(&self) -> bool {
169 self.0 >= PENALTY_CONNECT_THRESHOLD
170 }
171
172 pub fn is_ban_threshold_reached(&self) -> bool {
173 self.0 >= PENALTY_BAN_THRESHOLD
174 }
175}
176
177#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
179#[serde(tag = "status")]
180#[serde(rename_all = "camelCase")]
181#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
182pub enum SyncStatus {
183 #[serde(rename_all = "camelCase")]
185 Synced {
186 at: SyncedAt,
188 },
189 #[serde(rename_all = "camelCase")]
191 OutOfSync {
192 local: SyncedAt,
194 remote: SyncedAt,
196 },
197}
198
199impl Ord for SyncStatus {
200 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
201 match (self, other) {
202 (Self::Synced { at: left }, Self::Synced { at: right }) => left.cmp(right),
203 (Self::Synced { at }, Self::OutOfSync { remote, .. }) => at.cmp(remote),
204 (Self::OutOfSync { remote, .. }, Self::Synced { at }) => remote.cmp(at),
205 (Self::OutOfSync { remote: left, .. }, Self::OutOfSync { remote: right, .. }) => {
206 left.cmp(right)
207 }
208 }
209 }
210}
211
212impl PartialOrd for SyncStatus {
213 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
214 Some(self.cmp(other))
215 }
216}
217
218#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
220pub struct UserAgent(String);
221
222impl UserAgent {
223 pub fn as_str(&self) -> &str {
225 self.0.as_str()
226 }
227}
228
229impl Default for UserAgent {
230 fn default() -> Self {
231 UserAgent(String::from("/radicle/"))
232 }
233}
234
235impl std::fmt::Display for UserAgent {
236 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237 self.0.fmt(f)
238 }
239}
240
241impl FromStr for UserAgent {
242 type Err = String;
243
244 fn from_str(input: &str) -> Result<Self, Self::Err> {
245 let reserved = ['/', ':'];
246
247 if input.len() > 64 {
248 return Err(input.to_owned());
249 }
250 let Some(s) = input.strip_prefix('/') else {
251 return Err(input.to_owned());
252 };
253 let Some(s) = s.strip_suffix('/') else {
254 return Err(input.to_owned());
255 };
256 if s.is_empty() {
257 return Err(input.to_owned());
258 }
259 if s.split('/').all(|segment| {
260 if let Some((client, version)) = segment.split_once(':') {
261 if client.is_empty() || version.is_empty() {
262 false
263 } else {
264 let client = client
265 .chars()
266 .all(|c| c.is_ascii_graphic() && !reserved.contains(&c));
267 let version = version
268 .chars()
269 .all(|c| c.is_ascii_graphic() || !reserved.contains(&c));
270 client && version
271 }
272 } else {
273 true
274 }
275 }) {
276 Ok(Self(input.to_owned()))
277 } else {
278 Err(input.to_owned())
279 }
280 }
281}
282
283impl AsRef<str> for UserAgent {
284 fn as_ref(&self) -> &str {
285 self.0.as_str()
286 }
287}
288
289#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
291#[serde(try_from = "String", into = "String")]
292#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
293pub struct Alias(
294 #[cfg_attr(
303 feature = "schemars",
304 schemars(regex(pattern = r"^[^\x00-\x1F\x7F-\x9F\s]{0,32}$"), length(max = 32))
305 )]
306 String,
307);
308
309impl Alias {
310 pub fn new(alias: impl ToString) -> Self {
312 let alias = alias.to_string();
313
314 match Self::from_str(&alias) {
315 Ok(a) => a,
316 Err(e) => panic!("Alias::new: {e}"),
317 }
318 }
319
320 pub fn as_str(&self) -> &str {
322 self.0.as_str()
323 }
324}
325
326impl From<Alias> for String {
327 fn from(value: Alias) -> Self {
328 value.0
329 }
330}
331
332impl From<&NodeId> for Alias {
333 fn from(nid: &NodeId) -> Self {
334 Alias(nid.to_string())
335 }
336}
337
338impl fmt::Display for Alias {
339 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340 self.0.fmt(f)
341 }
342}
343
344impl Deref for Alias {
345 type Target = str;
346
347 fn deref(&self) -> &Self::Target {
348 &self.0
349 }
350}
351
352impl AsRef<str> for Alias {
353 fn as_ref(&self) -> &str {
354 self.0.as_str()
355 }
356}
357
358impl From<&Alias> for [u8; 32] {
359 fn from(input: &Alias) -> [u8; 32] {
360 let mut alias = [0u8; 32];
361
362 alias[..input.len()].copy_from_slice(input.as_bytes());
363 alias
364 }
365}
366
367#[derive(thiserror::Error, Debug)]
368pub enum AliasError {
369 #[error("alias cannot be empty")]
370 Empty,
371 #[error("alias cannot be greater than {MAX_ALIAS_LENGTH} bytes")]
372 MaxBytesExceeded,
373 #[error("alias cannot contain whitespace or control characters")]
374 InvalidCharacter,
375}
376
377impl FromStr for Alias {
378 type Err = AliasError;
379
380 fn from_str(s: &str) -> Result<Self, Self::Err> {
381 if s.is_empty() {
382 return Err(AliasError::Empty);
383 }
384 if s.chars().any(|c| c.is_control() || c.is_whitespace()) {
385 return Err(AliasError::InvalidCharacter);
386 }
387 if s.len() > MAX_ALIAS_LENGTH {
388 return Err(AliasError::MaxBytesExceeded);
389 }
390 Ok(Self(s.to_owned()))
391 }
392}
393
394impl TryFrom<String> for Alias {
395 type Error = AliasError;
396
397 fn try_from(value: String) -> Result<Self, Self::Error> {
398 Alias::from_str(&value)
399 }
400}
401
402impl TryFrom<&sqlite::Value> for Alias {
403 type Error = sqlite::Error;
404
405 fn try_from(value: &sqlite::Value) -> Result<Self, Self::Error> {
406 match value {
407 sqlite::Value::String(s) => Self::from_str(s).map_err(|e| sqlite::Error {
408 code: None,
409 message: Some(e.to_string()),
410 }),
411 _ => Err(sqlite::Error {
412 code: None,
413 message: Some(format!(
414 "sql: invalid type {:?} for alias, expected {:?}",
415 value.kind(),
416 sqlite::Type::String
417 )),
418 }),
419 }
420 }
421}
422
423#[derive(Clone, Eq, PartialEq, Debug, Hash, From, Wrapper, WrapperMut, Serialize, Deserialize)]
425#[wrapper(Deref, Display, FromStr)]
426#[wrapper_mut(DerefMut)]
427#[cfg_attr(
428 feature = "schemars",
429 derive(schemars::JsonSchema),
430 schemars(description = "\
431 An IP address, or a DNS name, or a Tor onion name, followed by the symbol ':', \
432 followed by a TCP port number.\
433")
434)]
435pub struct Address(
436 #[serde(with = "crate::serde_ext::string")]
437 #[cfg_attr(feature = "schemars", schemars(
438 with = "String",
439 regex(pattern = r"^.+:((6553[0-5])|(655[0-2][0-9])|(65[0-4][0-9]{2})|(6[0-4][0-9]{3})|([1-5][0-9]{4})|([0-5]{0,5})|([0-9]{1,4}))$"),
440 extend("examples" = [
441 "xmrhfasfg5suueegrnc4gsgyi2tyclcy5oz7f5drnrodmdtob6t2ioyd.onion:8776",
442 "seed.example.com:8776",
443 "192.0.2.0:31337",
444 ]),
445 ))]
446 NetAddr<HostName>,
447);
448
449impl Address {
450 pub fn is_local(&self) -> bool {
452 match &self.0.host {
453 HostName::Ip(ip) => address::is_local(ip),
454 HostName::Dns(name) => {
455 let name = name.strip_suffix(".").unwrap_or(name);
456
457 name.ends_with(".localhost") || name == "localhost"
460 }
461 _ => false,
462 }
463 }
464
465 pub fn is_routable(&self) -> bool {
467 match self.0.host {
468 HostName::Ip(ip) => address::is_routable(&ip),
469 HostName::Dns(_) => !self.is_local(),
470 _ => true,
471 }
472 }
473
474 pub fn host(&self) -> &HostName {
476 &self.0.host
477 }
478
479 pub fn is_onion(&self) -> bool {
481 match self.0.host {
482 HostName::Tor(_) => true,
483 _ => false,
484 }
485 }
486
487 pub fn port(&self) -> u16 {
489 self.0.port
490 }
491}
492
493impl cyphernet::addr::Host for Address {
494 fn requires_proxy(&self) -> bool {
495 self.0.requires_proxy()
496 }
497}
498
499impl cyphernet::addr::Addr for Address {
500 fn port(&self) -> u16 {
501 self.0.port()
502 }
503}
504
505impl From<net::SocketAddr> for Address {
506 fn from(addr: net::SocketAddr) -> Self {
507 Address(NetAddr {
508 host: HostName::Ip(addr.ip()),
509 port: addr.port(),
510 })
511 }
512}
513
514impl From<Address> for HostName {
515 fn from(addr: Address) -> Self {
516 addr.0.host
517 }
518}
519
520#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
522#[serde(rename_all = "camelCase")]
523#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
524pub enum Link {
525 Outbound,
527 Inbound,
529}
530
531impl Link {
532 pub fn is_outbound(&self) -> bool {
534 matches!(self, Self::Outbound)
535 }
536
537 pub fn is_inbound(&self) -> bool {
539 matches!(self, Self::Inbound)
540 }
541}
542
543impl std::fmt::Display for Link {
544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545 match self {
546 Link::Outbound => write!(f, "outbound"),
547 Link::Inbound => write!(f, "inbound"),
548 }
549 }
550}
551
552#[derive(Debug, Clone, Serialize, Deserialize)]
554#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
555pub struct Session {
556 pub nid: NodeId,
557 pub link: Link,
558 pub addr: Address,
559 pub state: State,
560}
561
562impl Session {
563 pub fn is_connected(&self) -> bool {
565 self.state.is_connected()
566 }
567}
568
569#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
571#[serde(rename_all = "camelCase")]
572#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
573pub struct Seed {
574 pub nid: NodeId,
576 pub addrs: Vec<KnownAddress>,
578 #[serde(default, skip_serializing_if = "Option::is_none")]
580 pub state: Option<State>,
581 #[serde(default, skip_serializing_if = "Option::is_none")]
583 pub sync: Option<SyncStatus>,
584}
585
586impl Seed {
587 pub fn is_connected(&self) -> bool {
589 matches!(self.state, Some(State::Connected { .. }))
590 }
591
592 pub fn is_synced(&self) -> bool {
594 matches!(self.sync, Some(SyncStatus::Synced { .. }))
595 }
596
597 pub fn new(
598 nid: NodeId,
599 addrs: Vec<KnownAddress>,
600 state: Option<State>,
601 sync: Option<SyncStatus>,
602 ) -> Self {
603 Self {
604 nid,
605 addrs,
606 state,
607 sync,
608 }
609 }
610}
611
612#[derive(Clone, Debug, Serialize, Deserialize)]
613#[serde(into = "Vec<Seed>", from = "Vec<Seed>")]
616#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
617pub struct Seeds(
618 #[cfg_attr(feature = "schemars", schemars(with = "Vec<Seed>"))]
619 address::AddressBook<NodeId, Seed>,
620);
621
622impl Seeds {
623 pub fn new(rng: fastrand::Rng) -> Self {
625 Self(address::AddressBook::new(rng))
626 }
627
628 pub fn insert(&mut self, seed: Seed) {
630 self.0.insert(seed.nid, seed);
631 }
632
633 pub fn contains(&self, nid: &NodeId) -> bool {
635 self.0.contains_key(nid)
636 }
637
638 pub fn len(&self) -> usize {
640 self.0.len()
641 }
642
643 pub fn is_empty(&self) -> bool {
645 self.0.is_empty()
646 }
647
648 pub fn partition(&self) -> (Vec<Seed>, Vec<Seed>) {
651 self.iter().cloned().partition(|s| s.is_connected())
652 }
653
654 pub fn connected(&self) -> impl Iterator<Item = &Seed> {
656 self.iter().filter(|s| s.is_connected())
657 }
658
659 pub fn iter(&self) -> impl Iterator<Item = &Seed> {
661 self.0.shuffled().map(|(_, v)| v)
662 }
663
664 pub fn is_connected(&self, nid: &NodeId) -> bool {
666 self.0.get(nid).is_some_and(|s| s.is_connected())
667 }
668
669 pub fn with(self, rng: fastrand::Rng) -> Self {
671 Self(self.0.with(rng))
672 }
673}
674
675impl From<Seeds> for Vec<Seed> {
676 fn from(seeds: Seeds) -> Vec<Seed> {
677 seeds.0.into_shuffled().map(|(_, v)| v).collect()
678 }
679}
680
681impl From<Vec<Seed>> for Seeds {
682 fn from(other: Vec<Seed>) -> Seeds {
683 Seeds(address::AddressBook::from_iter(
684 other.into_iter().map(|s| (s.nid, s)),
685 ))
686 }
687}
688
689#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
690#[serde(tag = "status", rename_all = "camelCase")]
691#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
692pub enum FetchResult {
693 Success {
694 updated: Vec<RefUpdate>,
695 namespaces: HashSet<NodeId>,
696 clone: bool,
697 },
698 Failed {
700 reason: String,
701 },
702}
703
704impl FetchResult {
705 pub fn is_success(&self) -> bool {
706 matches!(self, FetchResult::Success { .. })
707 }
708
709 pub fn success(self) -> Option<(Vec<RefUpdate>, HashSet<NodeId>)> {
710 match self {
711 Self::Success {
712 updated,
713 namespaces,
714 ..
715 } => Some((updated, namespaces)),
716 _ => None,
717 }
718 }
719
720 pub fn find_updated(&self, name: &git::fmt::RefStr) -> Option<RefUpdate> {
721 let updated = match self {
722 Self::Success { updated, .. } => Some(updated),
723 _ => None,
724 }?;
725 updated.iter().find(|up| up.name() == name).cloned()
726 }
727}
728
729impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>> for FetchResult {
730 fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>) -> Self {
731 match value {
732 Ok((updated, namespaces, clone)) => Self::Success {
733 updated,
734 namespaces,
735 clone,
736 },
737 Err(err) => Self::Failed {
738 reason: err.to_string(),
739 },
740 }
741 }
742}
743
744#[derive(Clone, Debug, Default)]
746pub struct FetchResults(Vec<(NodeId, FetchResult)>);
747
748impl FetchResults {
749 pub fn push(&mut self, nid: NodeId, result: FetchResult) {
751 self.0.push((nid, result));
752 }
753
754 pub fn contains(&self, nid: &NodeId) -> bool {
756 self.0.iter().any(|(n, _)| n == nid)
757 }
758
759 pub fn get(&self, nid: &NodeId) -> Option<&FetchResult> {
761 self.0.iter().find(|(n, _)| n == nid).map(|(_, r)| r)
762 }
763
764 pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &FetchResult)> {
766 self.0.iter().map(|(nid, r)| (nid, r))
767 }
768
769 pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate], HashSet<NodeId>)> {
771 self.0.iter().filter_map(|(nid, r)| {
772 if let FetchResult::Success {
773 updated,
774 namespaces,
775 ..
776 } = r
777 {
778 Some((nid, updated.as_slice(), namespaces.clone()))
779 } else {
780 None
781 }
782 })
783 }
784
785 pub fn failed(&self) -> impl Iterator<Item = (&NodeId, &str)> {
787 self.0.iter().filter_map(|(nid, r)| {
788 if let FetchResult::Failed { reason } = r {
789 Some((nid, reason.as_str()))
790 } else {
791 None
792 }
793 })
794 }
795}
796
797impl From<Vec<(NodeId, FetchResult)>> for FetchResults {
798 fn from(value: Vec<(NodeId, FetchResult)>) -> Self {
799 Self(value)
800 }
801}
802
803impl Deref for FetchResults {
804 type Target = [(NodeId, FetchResult)];
805
806 fn deref(&self) -> &Self::Target {
807 self.0.as_slice()
808 }
809}
810
811impl IntoIterator for FetchResults {
812 type Item = (NodeId, FetchResult);
813 type IntoIter = std::vec::IntoIter<(NodeId, FetchResult)>;
814
815 fn into_iter(self) -> Self::IntoIter {
816 self.0.into_iter()
817 }
818}
819
820#[derive(thiserror::Error, Debug)]
822pub enum Error {
823 #[error("i/o: {0}")]
824 Io(#[from] io::Error),
825 #[error("node: {0}")]
826 Node(String),
827 #[error("timed out reading from control socket")]
828 TimedOut,
829 #[error("failed to open node control socket {0:?} ({1})")]
830 Connect(PathBuf, io::ErrorKind),
831 #[error("command error: {reason}")]
832 Command { reason: String },
833 #[error("received invalid json `{response}` in response to command: {error}")]
834 InvalidJson {
835 response: String,
836 error: json::Error,
837 },
838 #[error("received empty response for command")]
839 EmptyResponse,
840}
841
842impl Error {
843 pub fn is_connection_err(&self) -> bool {
845 matches!(self, Self::Connect { .. })
846 }
847}
848
849#[derive(Debug, Serialize, Deserialize)]
850#[serde(rename_all = "camelCase", tag = "status")]
851#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
852pub enum ConnectResult {
853 Connected,
854 Disconnected { reason: String },
855}
856
857pub trait Handle: Clone + Sync + Send {
859 type Sessions;
861 type Events: IntoIterator<Item = Self::Event>;
862 type Event;
863 type Error: std::error::Error + Send + Sync + 'static;
865
866 fn nid(&self) -> Result<NodeId, Self::Error>;
868 fn is_running(&self) -> bool;
870 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error>;
872 fn config(&self) -> Result<config::Config, Self::Error>;
874 fn connect(
876 &mut self,
877 node: NodeId,
878 addr: Address,
879 opts: ConnectOptions,
880 ) -> Result<ConnectResult, Self::Error>;
881 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error>;
883
884 #[deprecated(note = "use `seeds_for` instead")]
886 fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
887 self.seeds_for(id, [self.nid()?])
888 }
889
890 fn seeds_for(
893 &mut self,
894 id: RepoId,
895 namespaces: impl IntoIterator<Item = PublicKey>,
896 ) -> Result<Seeds, Self::Error>;
897
898 fn fetch(
900 &mut self,
901 id: RepoId,
902 from: NodeId,
903 timeout: time::Duration,
904 ) -> Result<FetchResult, Self::Error>;
905 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Self::Error>;
908 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>;
910 fn block(&mut self, id: NodeId) -> Result<bool, Self::Error>;
912 fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
914 fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error>;
916
917 #[deprecated(note = "use `announce_refs_for` instead")]
920 fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
921 self.announce_refs_for(id, [self.nid()?])
922 }
923
924 fn announce_refs_for(
927 &mut self,
928 id: RepoId,
929 namespaces: impl IntoIterator<Item = PublicKey>,
930 ) -> Result<RefsAt, Self::Error>;
931
932 fn announce_inventory(&mut self) -> Result<(), Self::Error>;
934 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Self::Error>;
936 fn shutdown(self) -> Result<(), Self::Error>;
938 fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
940 fn session(&self, node: NodeId) -> Result<Option<Session>, Self::Error>;
942 fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
944 fn debug(&self) -> Result<json::Value, Self::Error>;
946}
947
948pub struct LineIter<T> {
953 stream: BufReader<UnixStream>,
954 timeout: time::Duration,
955 witness: PhantomData<T>,
956}
957
958impl<T: DeserializeOwned> Iterator for LineIter<T> {
959 type Item = Result<T, Error>;
960
961 fn next(&mut self) -> Option<Self::Item> {
962 let mut l = String::new();
963
964 self.stream
965 .get_ref()
966 .set_read_timeout(Some(self.timeout))
967 .ok();
968
969 match self.stream.read_line(&mut l) {
970 Ok(0) => None,
971 Ok(_) => {
972 let result: CommandResult<T> = match json::from_str(&l) {
973 Err(e) => {
974 return Some(Err(Error::InvalidJson {
975 response: l.clone(),
976 error: e,
977 }))
978 }
979 Ok(result) => result,
980 };
981 match result {
982 CommandResult::Okay(result) => Some(Ok(result)),
983 CommandResult::Error { reason } => Some(Err(Error::Command { reason })),
984 }
985 }
986 Err(e) => match e.kind() {
987 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Some(Err(Error::TimedOut)),
988 _ => Some(Err(Error::Io(e))),
989 },
990 }
991 }
992}
993
994#[derive(Debug, Clone)]
996pub struct Node {
997 socket: PathBuf,
998}
999
1000impl Node {
1001 pub fn new<P: AsRef<Path>>(path: P) -> Self {
1003 Self {
1004 socket: path.as_ref().to_path_buf(),
1005 }
1006 }
1007
1008 pub fn call<T: DeserializeOwned + Send + 'static>(
1010 &self,
1011 cmd: Command,
1012 timeout: time::Duration,
1013 ) -> Result<LineIter<T>, Error> {
1014 let mut stream = UnixStream::connect(&self.socket)
1015 .map_err(|e| Error::Connect(self.socket.clone(), e.kind()))?;
1016 cmd.to_writer(&mut stream)?;
1017 Ok(LineIter {
1018 stream: BufReader::new(stream),
1019 timeout,
1020 witness: PhantomData,
1021 })
1022 }
1023
1024 pub fn announce(
1028 &mut self,
1029 rid: RepoId,
1030 namespaces: impl IntoIterator<Item = PublicKey>,
1031 timeout: time::Duration,
1032 mut announcer: sync::Announcer,
1033 mut report: impl FnMut(&NodeId, sync::announce::Progress),
1034 ) -> Result<sync::AnnouncerResult, Error> {
1035 let mut events = self.subscribe(timeout)?;
1036 let refs = self.announce_refs_for(rid, namespaces)?;
1037
1038 let started = time::Instant::now();
1039
1040 loop {
1041 let Some(e) = events.next() else {
1042 return Ok(announcer.timed_out());
1045 };
1046 let elapsed = started.elapsed();
1047 if elapsed >= timeout {
1048 return Ok(announcer.timed_out());
1049 }
1050 match e {
1051 Ok(Event::RefsSynced {
1052 remote,
1053 rid: rid_,
1054 at,
1055 }) if rid == rid_ && refs.at == at => {
1056 log::debug!(target: "radicle", "Received {e:?}");
1057 match announcer.synced_with(remote, elapsed) {
1058 ControlFlow::Continue(progress) => {
1059 report(&remote, progress);
1060 }
1061 ControlFlow::Break(finished) => {
1062 return Ok(finished.into());
1063 }
1064 }
1065 }
1066 Ok(_) => {}
1067
1068 Err(Error::TimedOut) => {
1069 return Ok(announcer.timed_out());
1070 }
1071 Err(e) => return Err(e),
1072 }
1073 announcer = match announcer.can_continue() {
1076 ControlFlow::Continue(cont) => cont,
1077 ControlFlow::Break(finished) => return Ok(finished.into()),
1078 };
1079 }
1080 }
1081}
1082
1083impl Handle for Node {
1086 type Sessions = Vec<Session>;
1087 type Events = LineIter<Event>;
1088 type Event = Result<Event, Error>;
1089 type Error = Error;
1090
1091 fn nid(&self) -> Result<NodeId, Error> {
1092 self.call::<NodeId>(Command::NodeId, DEFAULT_TIMEOUT)?
1093 .next()
1094 .ok_or(Error::EmptyResponse)?
1095 }
1096
1097 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Error> {
1098 self.call::<Vec<net::SocketAddr>>(Command::ListenAddrs, DEFAULT_TIMEOUT)?
1099 .next()
1100 .ok_or(Error::EmptyResponse)?
1101 }
1102
1103 fn is_running(&self) -> bool {
1104 let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
1105 return false;
1106 };
1107
1108 let Some(Ok(_)) = lines.next() else {
1109 return false;
1110 };
1111 true
1112 }
1113
1114 fn config(&self) -> Result<config::Config, Error> {
1115 self.call::<config::Config>(Command::Config, DEFAULT_TIMEOUT)?
1116 .next()
1117 .ok_or(Error::EmptyResponse)?
1118 }
1119
1120 fn connect(
1121 &mut self,
1122 nid: NodeId,
1123 addr: Address,
1124 opts: ConnectOptions,
1125 ) -> Result<ConnectResult, Error> {
1126 let timeout = opts.timeout;
1127 let result = self
1128 .call::<ConnectResult>(
1129 Command::Connect {
1130 addr: (nid, addr).into(),
1131 opts,
1132 },
1133 timeout,
1134 )?
1135 .next()
1136 .ok_or(Error::EmptyResponse)??;
1137
1138 Ok(result)
1139 }
1140
1141 fn disconnect(&mut self, nid: NodeId) -> Result<(), Self::Error> {
1142 self.call::<ConnectResult>(Command::Disconnect { nid }, DEFAULT_TIMEOUT)?
1143 .next()
1144 .ok_or(Error::EmptyResponse)??;
1145
1146 Ok(())
1147 }
1148
1149 fn seeds_for(
1150 &mut self,
1151 rid: RepoId,
1152 namespaces: impl IntoIterator<Item = PublicKey>,
1153 ) -> Result<Seeds, Error> {
1154 let seeds = self
1155 .call::<Seeds>(
1156 Command::SeedsFor {
1157 rid,
1158 namespaces: HashSet::from_iter(namespaces),
1159 },
1160 DEFAULT_TIMEOUT,
1161 )?
1162 .next()
1163 .ok_or(Error::EmptyResponse)??;
1164
1165 Ok(seeds.with(profile::env::rng()))
1166 }
1167
1168 fn fetch(
1169 &mut self,
1170 rid: RepoId,
1171 from: NodeId,
1172 timeout: time::Duration,
1173 ) -> Result<FetchResult, Error> {
1174 let result = self
1175 .call(
1176 Command::Fetch {
1177 rid,
1178 nid: from,
1179 timeout,
1180 },
1181 DEFAULT_TIMEOUT.max(timeout),
1182 )?
1183 .next()
1184 .ok_or(Error::EmptyResponse)??;
1185
1186 Ok(result)
1187 }
1188
1189 fn follow(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
1190 let mut lines = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
1191 let response = lines.next().ok_or(Error::EmptyResponse)??;
1192
1193 Ok(response.updated)
1194 }
1195
1196 fn block(&mut self, nid: NodeId) -> Result<bool, Error> {
1197 let mut lines = self.call::<Success>(Command::Block { nid }, DEFAULT_TIMEOUT)?;
1198 let response = lines.next().ok_or(Error::EmptyResponse)??;
1199
1200 Ok(response.updated)
1201 }
1202
1203 fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
1204 let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
1205 let response = lines.next().ok_or(Error::EmptyResponse)??;
1206
1207 Ok(response.updated)
1208 }
1209
1210 fn unfollow(&mut self, nid: NodeId) -> Result<bool, Error> {
1211 let mut lines = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
1212 let response = lines.next().ok_or(Error::EmptyResponse)??;
1213
1214 Ok(response.updated)
1215 }
1216
1217 fn unseed(&mut self, rid: RepoId) -> Result<bool, Error> {
1218 let mut lines = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
1219 let response = lines.next().ok_or(Error::EmptyResponse)??;
1220
1221 Ok(response.updated)
1222 }
1223
1224 fn announce_refs_for(
1225 &mut self,
1226 rid: RepoId,
1227 namespaces: impl IntoIterator<Item = PublicKey>,
1228 ) -> Result<RefsAt, Error> {
1229 let refs: RefsAt = self
1230 .call(
1231 Command::AnnounceRefsFor {
1232 rid,
1233 namespaces: HashSet::from_iter(namespaces),
1234 },
1235 DEFAULT_TIMEOUT,
1236 )?
1237 .next()
1238 .ok_or(Error::EmptyResponse)??;
1239
1240 Ok(refs)
1241 }
1242
1243 fn announce_inventory(&mut self) -> Result<(), Error> {
1244 for line in self.call::<Success>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? {
1245 line?;
1246 }
1247 Ok(())
1248 }
1249
1250 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1251 let mut lines = self.call::<Success>(Command::AddInventory { rid }, DEFAULT_TIMEOUT)?;
1252 let response = lines.next().ok_or(Error::EmptyResponse)??;
1253
1254 Ok(response.updated)
1255 }
1256
1257 fn subscribe(&self, timeout: time::Duration) -> Result<LineIter<Event>, Error> {
1258 self.call(Command::Subscribe, timeout)
1259 }
1260
1261 fn sessions(&self) -> Result<Self::Sessions, Error> {
1262 let sessions = self
1263 .call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)?
1264 .next()
1265 .ok_or(Error::EmptyResponse)??;
1266
1267 Ok(sessions)
1268 }
1269
1270 fn session(&self, nid: NodeId) -> Result<Option<Session>, Error> {
1271 let session = self
1272 .call::<Option<Session>>(Command::Session { nid }, DEFAULT_TIMEOUT)?
1273 .next()
1274 .ok_or(Error::EmptyResponse)??;
1275
1276 Ok(session)
1277 }
1278
1279 fn debug(&self) -> Result<json::Value, Self::Error> {
1280 let debug = self
1281 .call::<json::Value>(Command::Debug, DEFAULT_TIMEOUT)?
1282 .next()
1283 .ok_or(Error::EmptyResponse {})??;
1284
1285 Ok(debug)
1286 }
1287
1288 fn shutdown(self) -> Result<(), Error> {
1289 for line in self.call::<Success>(Command::Shutdown, DEFAULT_TIMEOUT)? {
1290 line?;
1291 }
1292 while self.is_running() {
1294 thread::sleep(time::Duration::from_secs(1));
1295 }
1296 Ok(())
1297 }
1298}
1299
1300pub trait AliasStore {
1302 fn alias(&self, nid: &NodeId) -> Option<Alias>;
1304
1305 fn reverse_lookup(&self, alias: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>>;
1311}
1312
1313impl AliasStore for HashMap<NodeId, Alias> {
1314 fn alias(&self, nid: &NodeId) -> Option<Alias> {
1315 self.get(nid).map(ToOwned::to_owned)
1316 }
1317
1318 fn reverse_lookup(&self, needle: &Alias) -> BTreeMap<Alias, BTreeSet<NodeId>> {
1319 self.iter()
1320 .fold(BTreeMap::new(), |mut result, (node, alias)| {
1321 if alias.contains(needle.as_str()) {
1322 let nodes = result.entry(alias.clone()).or_default();
1323 nodes.insert(*node);
1324 }
1325 result
1326 })
1327 }
1328}
1329
1330#[cfg(test)]
1331pub(crate) mod properties {
1332 use std::collections::BTreeSet;
1333
1334 use crate::node::{Alias, NodeId};
1335 use crate::test::arbitrary;
1336
1337 use super::AliasStore;
1338
1339 pub struct AliasInput {
1340 short: (Alias, BTreeSet<NodeId>),
1341 long: (Alias, BTreeSet<NodeId>),
1342 }
1343
1344 impl AliasInput {
1345 pub fn new() -> Self {
1346 let short = arbitrary::gen::<Alias>(0);
1347 let long = {
1348 let mut a = short.to_string();
1350 a.push_str(arbitrary::gen::<Alias>(1).as_str());
1351 Alias::new(a)
1352 };
1353 Self {
1354 short: (short, arbitrary::vec::<NodeId>(3).into_iter().collect()),
1355 long: (long, arbitrary::vec::<NodeId>(2).into_iter().collect()),
1356 }
1357 }
1358
1359 pub fn short(&self) -> &(Alias, BTreeSet<NodeId>) {
1360 &self.short
1361 }
1362
1363 pub fn long(&self) -> &(Alias, BTreeSet<NodeId>) {
1364 &self.long
1365 }
1366 }
1367
1368 pub fn test_reverse_lookup(store: &impl AliasStore, AliasInput { short, long }: AliasInput) {
1377 let (short, short_ids) = short;
1378 let (long, long_ids) = long;
1379 let first = store.reverse_lookup(&short);
1380 assert_eq!(first.get(&short), Some(&short_ids),);
1382 assert_eq!(first.get(&long), Some(&long_ids));
1384
1385 let second = store.reverse_lookup(&long);
1386 assert_eq!(second.get(&short), None);
1388 assert_eq!(second.get(&long), Some(&long_ids));
1389
1390 let mixed_case = Alias::new(
1391 short
1392 .as_str()
1393 .chars()
1394 .enumerate()
1395 .map(|(i, c)| {
1396 if i % 2 == 0 {
1397 c.to_ascii_uppercase()
1398 } else {
1399 c.to_ascii_lowercase()
1400 }
1401 })
1402 .collect::<String>(),
1403 );
1404 let upper = store.reverse_lookup(&mixed_case);
1405 assert!(upper.contains_key(&short));
1406 }
1407}
1408
1409#[cfg(test)]
1410#[allow(clippy::unwrap_used)]
1411mod test {
1412 use super::*;
1413 use crate::assert_matches;
1414
1415 #[test]
1416 fn test_user_agent() {
1417 assert!(UserAgent::from_str("/radicle:1.0.0/").is_ok());
1418 assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/").is_ok());
1419 assert!(UserAgent::from_str("/radicle:1.0.0/heartwood:0.9/rust:1.77/").is_ok());
1420 assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
1421 assert!(UserAgent::from_str("/radicle:1.0.0-rc.1/").is_ok());
1422 assert!(UserAgent::from_str("/radicle:@a.b.c/").is_ok());
1423 assert!(UserAgent::from_str("/radicle/").is_ok());
1424 assert!(UserAgent::from_str("/rad/icle/").is_ok());
1425 assert!(UserAgent::from_str("/rad:ic/le/").is_ok());
1426
1427 assert!(UserAgent::from_str("/:/").is_err());
1428 assert!(UserAgent::from_str("//").is_err());
1429 assert!(UserAgent::from_str("").is_err());
1430 assert!(UserAgent::from_str("radicle:1.0.0/").is_err());
1431 assert!(UserAgent::from_str("/radicle:1.0.0").is_err());
1432 assert!(UserAgent::from_str("/radi cle:1.0/").is_err());
1433 assert!(UserAgent::from_str("/radi\ncle:1.0/").is_err());
1434 }
1435
1436 #[test]
1437 fn test_alias() {
1438 assert!(Alias::from_str("cloudhead").is_ok());
1439 assert!(Alias::from_str("cloud-head").is_ok());
1440 assert!(Alias::from_str("cl0ud.h3ad$__").is_ok());
1441 assert!(Alias::from_str("©loudhèâd").is_ok());
1442
1443 assert!(Alias::from_str("").is_err());
1444 assert!(Alias::from_str(" ").is_err());
1445 assert!(Alias::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").is_err());
1446 assert!(Alias::from_str("cloud\0head").is_err());
1447 assert!(Alias::from_str("cloud head").is_err());
1448 assert!(Alias::from_str("cloudhead\n").is_err());
1449 }
1450
1451 #[test]
1452 fn test_command_result() {
1453 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
1454 struct Test {
1455 value: u32,
1456 }
1457
1458 assert_eq!(json::to_string(&CommandResult::Okay(true)).unwrap(), "true");
1459 assert_eq!(
1460 json::to_string(&CommandResult::Okay(Test { value: 42 })).unwrap(),
1461 "{\"value\":42}"
1462 );
1463 assert_eq!(
1464 json::from_str::<CommandResult<Test>>("{\"value\":42}").unwrap(),
1465 CommandResult::Okay(Test { value: 42 })
1466 );
1467 assert_eq!(json::to_string(&CommandResult::ok()).unwrap(), "{}");
1468 assert_eq!(
1469 json::to_string(&CommandResult::updated(true)).unwrap(),
1470 "{\"updated\":true}"
1471 );
1472 assert_eq!(
1473 json::to_string(&CommandResult::error(io::Error::from(
1474 io::ErrorKind::NotFound
1475 )))
1476 .unwrap(),
1477 "{\"error\":\"entity not found\"}"
1478 );
1479
1480 json::from_str::<CommandResult<State>>(
1481 &serde_json::to_string(&CommandResult::Okay(State::Connected {
1482 since: LocalTime::now(),
1483 ping: Default::default(),
1484 latencies: VecDeque::default(),
1485 stable: false,
1486 }))
1487 .unwrap(),
1488 )
1489 .unwrap();
1490
1491 assert_matches!(
1492 json::from_str::<CommandResult<State>>(
1493 r#"{"connected":{"since":1699636852107,"fetching":[]}}"#
1494 ),
1495 Ok(CommandResult::Okay(_))
1496 );
1497 assert_matches!(
1498 json::from_str::<CommandResult<Seeds>>(
1499 r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
1500 ),
1501 Ok(CommandResult::Okay(_))
1502 );
1503 }
1504}