radicle_cli/
node.rs

1use core::time;
2use std::collections::BTreeSet;
3use std::io::Write;
4
5use radicle::node::sync;
6use radicle::node::{Handle as _, NodeId};
7use radicle::storage::{ReadRepository, RepositoryError};
8use radicle::{Node, Profile};
9
10use crate::terminal as term;
11
12/// Default time to wait for syncing to complete.
13pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
14
15/// Repository sync settings.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct SyncSettings {
18    /// Sync with at least N replicas.
19    pub replicas: sync::ReplicationFactor,
20    /// Sync with the given list of seeds.
21    pub seeds: BTreeSet<NodeId>,
22    /// How long to wait for syncing to complete.
23    pub timeout: time::Duration,
24}
25
26impl SyncSettings {
27    /// Set sync timeout. Defaults to [`DEFAULT_SYNC_TIMEOUT`].
28    pub fn timeout(mut self, timeout: time::Duration) -> Self {
29        self.timeout = timeout;
30        self
31    }
32
33    /// Set replicas.
34    pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
35        self.replicas = replicas;
36        self
37    }
38
39    /// Set seeds.
40    pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
41        self.seeds = seeds.into_iter().collect();
42        self
43    }
44
45    /// Use profile to populate sync settings, by adding preferred seeds if no seeds are specified,
46    /// and removing the local node from the set.
47    pub fn with_profile(mut self, profile: &Profile) -> Self {
48        // If no seeds were specified, add the preferred seeds.
49        if self.seeds.is_empty() {
50            self.seeds = profile
51                .config
52                .preferred_seeds
53                .iter()
54                .map(|p| p.id)
55                .collect();
56        }
57        // Remove our local node from the seed set just in case it was added by mistake.
58        self.seeds.remove(profile.id());
59        self
60    }
61}
62
63impl Default for SyncSettings {
64    fn default() -> Self {
65        Self {
66            replicas: sync::ReplicationFactor::default(),
67            seeds: BTreeSet::new(),
68            timeout: DEFAULT_SYNC_TIMEOUT,
69        }
70    }
71}
72
73/// Error while syncing.
74#[derive(thiserror::Error, Debug)]
75pub enum SyncError {
76    #[error(transparent)]
77    Repository(#[from] RepositoryError),
78    #[error(transparent)]
79    Node(#[from] radicle::node::Error),
80    #[error("all seeds timed out")]
81    AllSeedsTimedOut,
82    #[error(transparent)]
83    Target(#[from] sync::announce::TargetError),
84}
85
86impl SyncError {
87    fn is_connection_err(&self) -> bool {
88        match self {
89            Self::Node(e) => e.is_connection_err(),
90            _ => false,
91        }
92    }
93}
94
95/// Configures how sync progress is reported.
96pub struct SyncReporting {
97    /// Progress messages or animations.
98    pub progress: term::PaintTarget,
99    /// Completion messages.
100    pub completion: term::PaintTarget,
101    /// Debug output.
102    pub debug: bool,
103}
104
105impl Default for SyncReporting {
106    fn default() -> Self {
107        Self {
108            progress: term::PaintTarget::Stderr,
109            completion: term::PaintTarget::Stdout,
110            debug: false,
111        }
112    }
113}
114
115/// Announce changes to the network.
116pub fn announce<R: ReadRepository>(
117    repo: &R,
118    settings: SyncSettings,
119    reporting: SyncReporting,
120    node: &mut Node,
121    profile: &Profile,
122) -> Result<Option<sync::AnnouncerResult>, SyncError> {
123    match announce_(repo, settings, reporting, node, profile) {
124        Ok(result) => Ok(result),
125        Err(e) if e.is_connection_err() => {
126            term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
127            Ok(None)
128        }
129        Err(e) => Err(e),
130    }
131}
132
133fn announce_<R>(
134    repo: &R,
135    settings: SyncSettings,
136    reporting: SyncReporting,
137    node: &mut Node,
138    profile: &Profile,
139) -> Result<Option<sync::AnnouncerResult>, SyncError>
140where
141    R: ReadRepository,
142{
143    let me = profile.id();
144    let rid = repo.id();
145    let doc = repo.identity_doc()?;
146
147    let settings = settings.with_profile(profile);
148    let n_preferred_seeds = settings.seeds.len();
149
150    let config = match sync::PrivateNetwork::private_repo(&doc) {
151        None => {
152            let (synced, unsynced) = node.seeds(rid)?.iter().fold(
153                (BTreeSet::new(), BTreeSet::new()),
154                |(mut synced, mut unsynced), seed| {
155                    if seed.is_synced() {
156                        synced.insert(seed.nid);
157                    } else {
158                        unsynced.insert(seed.nid);
159                    }
160                    (synced, unsynced)
161                },
162            );
163            sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
164        }
165        Some(network) => {
166            let sessions = node.sessions()?;
167            let network =
168                network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
169            sync::AnnouncerConfig::private(*me, settings.replicas, network)
170        }
171    };
172    let announcer = match sync::Announcer::new(config) {
173        Ok(announcer) => announcer,
174        Err(err) => match err {
175            sync::AnnouncerError::AlreadySynced(result) => {
176                term::success!(
177                    &mut reporting.completion.writer();
178                    "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
179                    term::format::positive(result.synced()),
180                );
181                return Ok(None);
182            }
183            sync::AnnouncerError::NoSeeds => {
184                term::info!(
185                    &mut reporting.completion.writer();
186                    "{}",
187                    term::format::yellow(format!("No seeds found for {rid}."))
188                );
189                return Ok(None);
190            }
191            sync::AnnouncerError::Target(err) => return Err(err.into()),
192        },
193    };
194    let target = announcer.target();
195    let min_replicas = target.replicas().lower_bound();
196    let mut spinner = term::spinner_to(
197        format!("Found {} seed(s)..", announcer.progress().unsynced()),
198        reporting.progress.clone(),
199        reporting.completion.clone(),
200    );
201
202    match node.announce(rid, settings.timeout, announcer, |node, progress| {
203        spinner.message(format!(
204            "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
205            term::format::node_id_human_compact(node),
206            term::format::secondary(progress.preferred()),
207            term::format::secondary(n_preferred_seeds),
208            term::format::secondary(progress.synced()),
209            // N.b. the number of replicas could exceed the target if we're
210            // waiting for preferred seeds
211            term::format::secondary(min_replicas.max(progress.synced())),
212        ));
213    }) {
214        Ok(result) => {
215            spinner.message(format!(
216                "Synced with {} seed(s)",
217                term::format::positive(result.synced().len())
218            ));
219            spinner.finish();
220            Ok(Some(result))
221        }
222        Err(err) => {
223            spinner.error(format!("Sync failed: {err}"));
224            Err(err.into())
225        }
226    }
227}