Skip to main content

radicle_cli/
node.rs

1use core::time;
2use std::collections::BTreeSet;
3use std::io::Write;
4
5use radicle::node::sync;
6use radicle::node::{Handle as _, NodeId};
7use radicle::storage::{refs, ReadRepository, RepositoryError};
8use radicle::{Node, Profile};
9
10use crate::terminal as term;
11
12/// Default time to wait for syncing to complete.
13pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
14
15/// Repository sync settings.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct SyncSettings {
18    /// Sync with at least N replicas.
19    pub replicas: sync::ReplicationFactor,
20    /// Sync with the given list of seeds.
21    pub seeds: BTreeSet<NodeId>,
22    /// How long to wait for syncing to complete.
23    pub timeout: time::Duration,
24    /// The minimum feature level to accept when fetching signed references.
25    pub signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
26}
27
28impl SyncSettings {
29    /// Set sync timeout. Defaults to [`DEFAULT_SYNC_TIMEOUT`].
30    #[must_use]
31    pub fn timeout(mut self, timeout: time::Duration) -> Self {
32        self.timeout = timeout;
33        self
34    }
35
36    /// Set minimum feature level for fetching signed references.
37    #[must_use]
38    pub fn minimum_feature_level(mut self, feature_level: Option<refs::FeatureLevel>) -> Self {
39        self.signed_references_minimum_feature_level = feature_level;
40        self
41    }
42
43    /// Set replicas.
44    #[must_use]
45    pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
46        self.replicas = replicas;
47        self
48    }
49
50    /// Set seeds.
51    pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
52        self.seeds = seeds.into_iter().collect();
53        self
54    }
55
56    /// Use profile to populate sync settings, by adding preferred seeds if no seeds are specified,
57    /// and removing the local node from the set.
58    #[must_use]
59    pub fn with_profile(mut self, profile: &Profile) -> Self {
60        // If no seeds were specified, add the preferred seeds.
61        if self.seeds.is_empty() {
62            self.seeds = profile
63                .config
64                .preferred_seeds
65                .iter()
66                .map(|p| p.id)
67                .collect();
68        }
69        // Remove our local node from the seed set just in case it was added by mistake.
70        self.seeds.remove(profile.id());
71        self
72    }
73}
74
75impl Default for SyncSettings {
76    fn default() -> Self {
77        Self {
78            replicas: sync::ReplicationFactor::default(),
79            seeds: BTreeSet::new(),
80            timeout: DEFAULT_SYNC_TIMEOUT,
81            signed_references_minimum_feature_level: None,
82        }
83    }
84}
85
86/// Error while syncing.
87#[derive(thiserror::Error, Debug)]
88pub enum SyncError {
89    #[error(transparent)]
90    Repository(#[from] RepositoryError),
91    #[error(transparent)]
92    Node(#[from] radicle::node::Error),
93    #[error("all seeds timed out")]
94    AllSeedsTimedOut,
95    #[error(transparent)]
96    Target(#[from] sync::announce::TargetError),
97}
98
99impl SyncError {
100    fn is_connection_err(&self) -> bool {
101        match self {
102            Self::Node(e) => e.is_connection_err(),
103            Self::Repository(_) | Self::AllSeedsTimedOut | Self::Target(_) => false,
104        }
105    }
106}
107
108/// Configures how sync progress is reported.
109pub struct SyncReporting {
110    /// Progress messages or animations.
111    pub progress: term::PaintTarget,
112    /// Completion messages.
113    pub completion: term::PaintTarget,
114    /// Debug output.
115    pub debug: bool,
116}
117
118impl Default for SyncReporting {
119    fn default() -> Self {
120        Self {
121            progress: term::PaintTarget::Stderr,
122            completion: term::PaintTarget::Stdout,
123            debug: false,
124        }
125    }
126}
127
128/// Announce changes to the network.
129pub fn announce<R: ReadRepository>(
130    repo: &R,
131    settings: SyncSettings,
132    reporting: SyncReporting,
133    node: &mut Node,
134    profile: &Profile,
135) -> Result<Option<sync::AnnouncerResult>, SyncError> {
136    match announce_(repo, settings, reporting, node, profile) {
137        Ok(result) => Ok(result),
138        Err(e) if e.is_connection_err() => {
139            term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
140            Ok(None)
141        }
142        Err(e) => Err(e),
143    }
144}
145
146fn announce_<R>(
147    repo: &R,
148    settings: SyncSettings,
149    reporting: SyncReporting,
150    node: &mut Node,
151    profile: &Profile,
152) -> Result<Option<sync::AnnouncerResult>, SyncError>
153where
154    R: ReadRepository,
155{
156    let me = profile.id();
157    let rid = repo.id();
158    let doc = repo.identity_doc()?;
159
160    let settings = settings.with_profile(profile);
161    let n_preferred_seeds = settings.seeds.len();
162
163    let config = match sync::PrivateNetwork::private_repo(&doc) {
164        None => {
165            let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
166                (BTreeSet::new(), BTreeSet::new()),
167                |(mut synced, mut unsynced), seed| {
168                    if seed.is_synced() {
169                        synced.insert(seed.nid);
170                    } else {
171                        unsynced.insert(seed.nid);
172                    }
173                    (synced, unsynced)
174                },
175            );
176            sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
177        }
178        Some(network) => {
179            let sessions = node.sessions()?;
180            let network =
181                network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
182            sync::AnnouncerConfig::private(*me, settings.replicas, network)
183        }
184    };
185    let announcer = match sync::Announcer::new(config) {
186        Ok(announcer) => announcer,
187        Err(err) => match err {
188            sync::AnnouncerError::AlreadySynced(result) => {
189                term::success!(
190                    &mut reporting.completion.writer();
191                    "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
192                    term::format::positive(result.synced()),
193                );
194                return Ok(None);
195            }
196            sync::AnnouncerError::NoSeeds => {
197                term::info!(
198                    &mut reporting.completion.writer();
199                    "{}",
200                    term::format::yellow(format!("No seeds found for {rid}."))
201                );
202                return Ok(None);
203            }
204            sync::AnnouncerError::Target(err) => return Err(err.into()),
205        },
206    };
207    let target = announcer.target();
208    let min_replicas = target.replicas().lower_bound();
209    let mut spinner = term::spinner_to(
210        format!("Found {} seed(s)..", announcer.progress().unsynced()),
211        reporting.progress.clone(),
212        reporting.completion.clone(),
213    );
214
215    match node.announce(
216        rid,
217        [profile.did().into()],
218        settings.timeout,
219        announcer,
220        |node, progress| {
221            spinner.message(format!(
222                "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
223                term::format::node_id_human_compact(node),
224                term::format::secondary(progress.preferred()),
225                term::format::secondary(n_preferred_seeds),
226                term::format::secondary(progress.synced()),
227                // N.b. the number of replicas could exceed the target if we're
228                // waiting for preferred seeds
229                term::format::secondary(min_replicas.max(progress.synced())),
230            ));
231        },
232    ) {
233        Ok(result) => {
234            spinner.message(format!(
235                "Synced with {} seed(s)",
236                term::format::positive(result.synced().len())
237            ));
238            spinner.finish();
239            Ok(Some(result))
240        }
241        Err(err) => {
242            spinner.error(format!("Sync failed: {err}"));
243            Err(err.into())
244        }
245    }
246}