Skip to main content

radicle_cli/
node.rs

1use core::time;
2use std::collections::BTreeSet;
3use std::io::Write;
4
5use radicle::node::sync;
6use radicle::node::{Handle as _, NodeId};
7use radicle::storage::{ReadRepository, RepositoryError};
8use radicle::{Node, Profile};
9
10use crate::terminal as term;
11
12/// Default time to wait for syncing to complete.
13pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
14
15/// Repository sync settings.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct SyncSettings {
18    /// Sync with at least N replicas.
19    pub replicas: sync::ReplicationFactor,
20    /// Sync with the given list of seeds.
21    pub seeds: BTreeSet<NodeId>,
22    /// How long to wait for syncing to complete.
23    pub timeout: time::Duration,
24}
25
26impl SyncSettings {
27    /// Set sync timeout. Defaults to [`DEFAULT_SYNC_TIMEOUT`].
28    #[must_use]
29    pub fn timeout(mut self, timeout: time::Duration) -> Self {
30        self.timeout = timeout;
31        self
32    }
33
34    /// Set replicas.
35    #[must_use]
36    pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
37        self.replicas = replicas;
38        self
39    }
40
41    /// Set seeds.
42    pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
43        self.seeds = seeds.into_iter().collect();
44        self
45    }
46
47    /// Use profile to populate sync settings, by adding preferred seeds if no seeds are specified,
48    /// and removing the local node from the set.
49    #[must_use]
50    pub fn with_profile(mut self, profile: &Profile) -> Self {
51        // If no seeds were specified, add the preferred seeds.
52        if self.seeds.is_empty() {
53            self.seeds = profile
54                .config
55                .preferred_seeds
56                .iter()
57                .map(|p| p.id)
58                .collect();
59        }
60        // Remove our local node from the seed set just in case it was added by mistake.
61        self.seeds.remove(profile.id());
62        self
63    }
64}
65
66impl Default for SyncSettings {
67    fn default() -> Self {
68        Self {
69            replicas: sync::ReplicationFactor::default(),
70            seeds: BTreeSet::new(),
71            timeout: DEFAULT_SYNC_TIMEOUT,
72        }
73    }
74}
75
76/// Error while syncing.
77#[derive(thiserror::Error, Debug)]
78pub enum SyncError {
79    #[error(transparent)]
80    Repository(#[from] RepositoryError),
81    #[error(transparent)]
82    Node(#[from] radicle::node::Error),
83    #[error("all seeds timed out")]
84    AllSeedsTimedOut,
85    #[error(transparent)]
86    Target(#[from] sync::announce::TargetError),
87}
88
89impl SyncError {
90    fn is_connection_err(&self) -> bool {
91        match self {
92            Self::Node(e) => e.is_connection_err(),
93            Self::Repository(_) | Self::AllSeedsTimedOut | Self::Target(_) => false,
94        }
95    }
96}
97
98/// Configures how sync progress is reported.
99pub struct SyncReporting {
100    /// Progress messages or animations.
101    pub progress: term::PaintTarget,
102    /// Completion messages.
103    pub completion: term::PaintTarget,
104    /// Debug output.
105    pub debug: bool,
106}
107
108impl Default for SyncReporting {
109    fn default() -> Self {
110        Self {
111            progress: term::PaintTarget::Stderr,
112            completion: term::PaintTarget::Stdout,
113            debug: false,
114        }
115    }
116}
117
118/// Announce changes to the network.
119pub fn announce<R: ReadRepository>(
120    repo: &R,
121    settings: SyncSettings,
122    reporting: SyncReporting,
123    node: &mut Node,
124    profile: &Profile,
125) -> Result<Option<sync::AnnouncerResult>, SyncError> {
126    match announce_(repo, settings, reporting, node, profile) {
127        Ok(result) => Ok(result),
128        Err(e) if e.is_connection_err() => {
129            term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
130            Ok(None)
131        }
132        Err(e) => Err(e),
133    }
134}
135
136fn announce_<R>(
137    repo: &R,
138    settings: SyncSettings,
139    reporting: SyncReporting,
140    node: &mut Node,
141    profile: &Profile,
142) -> Result<Option<sync::AnnouncerResult>, SyncError>
143where
144    R: ReadRepository,
145{
146    let me = profile.id();
147    let rid = repo.id();
148    let doc = repo.identity_doc()?;
149
150    let settings = settings.with_profile(profile);
151    let n_preferred_seeds = settings.seeds.len();
152
153    let config = match sync::PrivateNetwork::private_repo(&doc) {
154        None => {
155            let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
156                (BTreeSet::new(), BTreeSet::new()),
157                |(mut synced, mut unsynced), seed| {
158                    if seed.is_synced() {
159                        synced.insert(seed.nid);
160                    } else {
161                        unsynced.insert(seed.nid);
162                    }
163                    (synced, unsynced)
164                },
165            );
166            sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
167        }
168        Some(network) => {
169            let sessions = node.sessions()?;
170            let network =
171                network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
172            sync::AnnouncerConfig::private(*me, settings.replicas, network)
173        }
174    };
175    let announcer = match sync::Announcer::new(config) {
176        Ok(announcer) => announcer,
177        Err(err) => match err {
178            sync::AnnouncerError::AlreadySynced(result) => {
179                term::success!(
180                    &mut reporting.completion.writer();
181                    "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
182                    term::format::positive(result.synced()),
183                );
184                return Ok(None);
185            }
186            sync::AnnouncerError::NoSeeds => {
187                term::info!(
188                    &mut reporting.completion.writer();
189                    "{}",
190                    term::format::yellow(format!("No seeds found for {rid}."))
191                );
192                return Ok(None);
193            }
194            sync::AnnouncerError::Target(err) => return Err(err.into()),
195        },
196    };
197    let target = announcer.target();
198    let min_replicas = target.replicas().lower_bound();
199    let mut spinner = term::spinner_to(
200        format!("Found {} seed(s)..", announcer.progress().unsynced()),
201        reporting.progress.clone(),
202        reporting.completion.clone(),
203    );
204
205    match node.announce(
206        rid,
207        [profile.did().into()],
208        settings.timeout,
209        announcer,
210        |node, progress| {
211            spinner.message(format!(
212                "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
213                term::format::node_id_human_compact(node),
214                term::format::secondary(progress.preferred()),
215                term::format::secondary(n_preferred_seeds),
216                term::format::secondary(progress.synced()),
217                // N.b. the number of replicas could exceed the target if we're
218                // waiting for preferred seeds
219                term::format::secondary(min_replicas.max(progress.synced())),
220            ));
221        },
222    ) {
223        Ok(result) => {
224            spinner.message(format!(
225                "Synced with {} seed(s)",
226                term::format::positive(result.synced().len())
227            ));
228            spinner.finish();
229            Ok(Some(result))
230        }
231        Err(err) => {
232            spinner.error(format!("Sync failed: {err}"));
233            Err(err.into())
234        }
235    }
236}