radicle_cli/
node.rs

1use core::time;
2use std::collections::BTreeSet;
3use std::io;
4use std::io::Write;
5
6use radicle::node::sync;
7use radicle::node::{Handle as _, NodeId};
8use radicle::storage::{ReadRepository, RepositoryError};
9use radicle::{Node, Profile};
10
11use crate::terminal as term;
12
13/// Default time to wait for syncing to complete.
14pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
15
16/// Repository sync settings.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct SyncSettings {
19    /// Sync with at least N replicas.
20    pub replicas: sync::ReplicationFactor,
21    /// Sync with the given list of seeds.
22    pub seeds: BTreeSet<NodeId>,
23    /// How long to wait for syncing to complete.
24    pub timeout: time::Duration,
25}
26
27impl SyncSettings {
28    /// Set sync timeout. Defaults to [`DEFAULT_SYNC_TIMEOUT`].
29    pub fn timeout(mut self, timeout: time::Duration) -> Self {
30        self.timeout = timeout;
31        self
32    }
33
34    /// Set replicas.
35    pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
36        self.replicas = replicas;
37        self
38    }
39
40    /// Set seeds.
41    pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
42        self.seeds = seeds.into_iter().collect();
43        self
44    }
45
46    /// Use profile to populate sync settings, by adding preferred seeds if no seeds are specified,
47    /// and removing the local node from the set.
48    pub fn with_profile(mut self, profile: &Profile) -> Self {
49        // If no seeds were specified, add the preferred seeds.
50        if self.seeds.is_empty() {
51            self.seeds = profile
52                .config
53                .preferred_seeds
54                .iter()
55                .map(|p| p.id)
56                .collect();
57        }
58        // Remove our local node from the seed set just in case it was added by mistake.
59        self.seeds.remove(profile.id());
60        self
61    }
62}
63
64impl Default for SyncSettings {
65    fn default() -> Self {
66        Self {
67            replicas: sync::ReplicationFactor::default(),
68            seeds: BTreeSet::new(),
69            timeout: DEFAULT_SYNC_TIMEOUT,
70        }
71    }
72}
73
74/// Error while syncing.
75#[derive(thiserror::Error, Debug)]
76pub enum SyncError {
77    #[error(transparent)]
78    Repository(#[from] RepositoryError),
79    #[error(transparent)]
80    Node(#[from] radicle::node::Error),
81    #[error("all seeds timed out")]
82    AllSeedsTimedOut,
83    #[error(transparent)]
84    Target(#[from] sync::announce::TargetError),
85}
86
87impl SyncError {
88    fn is_connection_err(&self) -> bool {
89        match self {
90            Self::Node(e) => e.is_connection_err(),
91            _ => false,
92        }
93    }
94}
95
96/// Writes sync output.
97#[derive(Debug)]
98pub enum SyncWriter {
99    /// Write to standard out.
100    Stdout(io::Stdout),
101    /// Write to standard error.
102    Stderr(io::Stderr),
103    /// Discard output, like [`std::io::sink`].
104    Sink,
105}
106
107impl Clone for SyncWriter {
108    fn clone(&self) -> Self {
109        match self {
110            Self::Stdout(_) => Self::Stdout(io::stdout()),
111            Self::Stderr(_) => Self::Stderr(io::stderr()),
112            Self::Sink => Self::Sink,
113        }
114    }
115}
116
117impl io::Write for SyncWriter {
118    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
119        match self {
120            Self::Stdout(stdout) => stdout.write(buf),
121            Self::Stderr(stderr) => stderr.write(buf),
122            Self::Sink => Ok(buf.len()),
123        }
124    }
125
126    fn flush(&mut self) -> io::Result<()> {
127        match self {
128            Self::Stdout(stdout) => stdout.flush(),
129            Self::Stderr(stderr) => stderr.flush(),
130            Self::Sink => Ok(()),
131        }
132    }
133}
134
135/// Configures how sync progress is reported.
136pub struct SyncReporting {
137    /// Progress messages or animations.
138    pub progress: SyncWriter,
139    /// Completion messages.
140    pub completion: SyncWriter,
141    /// Debug output.
142    pub debug: bool,
143}
144
145impl Default for SyncReporting {
146    fn default() -> Self {
147        Self {
148            progress: SyncWriter::Stderr(io::stderr()),
149            completion: SyncWriter::Stdout(io::stdout()),
150            debug: false,
151        }
152    }
153}
154
155/// Announce changes to the network.
156pub fn announce<R: ReadRepository>(
157    repo: &R,
158    settings: SyncSettings,
159    reporting: SyncReporting,
160    node: &mut Node,
161    profile: &Profile,
162) -> Result<Option<sync::AnnouncerResult>, SyncError> {
163    match announce_(repo, settings, reporting, node, profile) {
164        Ok(result) => Ok(result),
165        Err(e) if e.is_connection_err() => {
166            term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
167            Ok(None)
168        }
169        Err(e) => Err(e),
170    }
171}
172
173fn announce_<R>(
174    repo: &R,
175    settings: SyncSettings,
176    mut reporting: SyncReporting,
177    node: &mut Node,
178    profile: &Profile,
179) -> Result<Option<sync::AnnouncerResult>, SyncError>
180where
181    R: ReadRepository,
182{
183    let me = profile.id();
184    let rid = repo.id();
185    let doc = repo.identity_doc()?;
186
187    let settings = settings.with_profile(profile);
188    let n_preferred_seeds = settings.seeds.len();
189
190    let config = match sync::PrivateNetwork::private_repo(&doc) {
191        None => {
192            let (synced, unsynced) = node.seeds(rid)?.iter().fold(
193                (BTreeSet::new(), BTreeSet::new()),
194                |(mut synced, mut unsynced), seed| {
195                    if seed.is_synced() {
196                        synced.insert(seed.nid);
197                    } else {
198                        unsynced.insert(seed.nid);
199                    }
200                    (synced, unsynced)
201                },
202            );
203            sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
204        }
205        Some(network) => {
206            let sessions = node.sessions()?;
207            let network =
208                network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
209            sync::AnnouncerConfig::private(*me, settings.replicas, network)
210        }
211    };
212    let announcer = match sync::Announcer::new(config) {
213        Ok(announcer) => announcer,
214        Err(err) => match err {
215            sync::AnnouncerError::AlreadySynced(result) => {
216                term::success!(
217                    &mut reporting.completion;
218                    "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
219                    term::format::positive(result.synced()),
220                );
221                return Ok(None);
222            }
223            sync::AnnouncerError::NoSeeds => {
224                term::info!(
225                    &mut reporting.completion;
226                    "{}",
227                    term::format::yellow(format!("No seeds found for {rid}."))
228                );
229                return Ok(None);
230            }
231            sync::AnnouncerError::Target(err) => return Err(err.into()),
232        },
233    };
234    let target = announcer.target();
235    let min_replicas = target.replicas().lower_bound();
236    let mut spinner = term::spinner_to(
237        format!("Found {} seed(s)..", announcer.progress().unsynced()),
238        reporting.completion.clone(),
239        reporting.progress.clone(),
240    );
241
242    match node.announce(rid, settings.timeout, announcer, |node, progress| {
243        spinner.message(format!(
244            "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
245            term::format::node_id_human_compact(node),
246            term::format::secondary(progress.preferred()),
247            term::format::secondary(n_preferred_seeds),
248            term::format::secondary(progress.synced()),
249            // N.b. the number of replicas could exceed the target if we're
250            // waiting for preferred seeds
251            term::format::secondary(min_replicas.max(progress.synced())),
252        ));
253    }) {
254        Ok(result) => {
255            spinner.message(format!(
256                "Synced with {} seed(s)",
257                term::format::positive(result.synced().len())
258            ));
259            spinner.finish();
260            Ok(Some(result))
261        }
262        Err(err) => {
263            spinner.error(format!("Sync failed: {err}"));
264            Err(err.into())
265        }
266    }
267}