1use core::time;
2use std::collections::BTreeSet;
3use std::io::Write;
4
5use radicle::node::sync;
6use radicle::node::{Handle as _, NodeId};
7use radicle::storage::{ReadRepository, RepositoryError, refs};
8use radicle::{Node, Profile};
9
10use crate::terminal as term;
11
12pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct SyncSettings {
18 pub replicas: sync::ReplicationFactor,
20 pub seeds: BTreeSet<NodeId>,
22 pub timeout: time::Duration,
24 pub signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
26}
27
28impl SyncSettings {
29 #[must_use]
31 pub fn timeout(mut self, timeout: time::Duration) -> Self {
32 self.timeout = timeout;
33 self
34 }
35
36 #[must_use]
38 pub fn minimum_feature_level(mut self, feature_level: Option<refs::FeatureLevel>) -> Self {
39 self.signed_references_minimum_feature_level = feature_level;
40 self
41 }
42
43 #[must_use]
45 pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
46 self.replicas = replicas;
47 self
48 }
49
50 pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
52 self.seeds = seeds.into_iter().collect();
53 self
54 }
55
56 #[must_use]
59 pub fn with_profile(mut self, profile: &Profile) -> Self {
60 if self.seeds.is_empty() {
62 self.seeds = profile
63 .config
64 .preferred_seeds
65 .iter()
66 .map(|p| p.id)
67 .collect();
68 }
69 self.seeds.remove(profile.id());
71 self
72 }
73}
74
75impl Default for SyncSettings {
76 fn default() -> Self {
77 Self {
78 replicas: sync::ReplicationFactor::default(),
79 seeds: BTreeSet::new(),
80 timeout: DEFAULT_SYNC_TIMEOUT,
81 signed_references_minimum_feature_level: None,
82 }
83 }
84}
85
86#[derive(thiserror::Error, Debug)]
88pub enum SyncError {
89 #[error(transparent)]
90 Repository(#[from] RepositoryError),
91 #[error(transparent)]
92 Node(#[from] radicle::node::Error),
93 #[error("all seeds timed out")]
94 AllSeedsTimedOut,
95 #[error(transparent)]
96 Target(#[from] sync::announce::TargetError),
97}
98
99impl SyncError {
100 fn is_connection_err(&self) -> bool {
101 match self {
102 Self::Node(e) => e.is_connection_err(),
103 Self::Repository(_) | Self::AllSeedsTimedOut | Self::Target(_) => false,
104 }
105 }
106}
107
108pub struct SyncReporting {
110 pub progress: term::PaintTarget,
112 pub completion: term::PaintTarget,
114 pub debug: bool,
116}
117
118impl Default for SyncReporting {
119 fn default() -> Self {
120 Self {
121 progress: term::PaintTarget::Stderr,
122 completion: term::PaintTarget::Stdout,
123 debug: false,
124 }
125 }
126}
127
128pub fn announce<R: ReadRepository>(
130 repo: &R,
131 settings: SyncSettings,
132 reporting: SyncReporting,
133 node: &mut Node,
134 profile: &Profile,
135) -> Result<Option<sync::AnnouncerResult>, SyncError> {
136 match announce_(repo, settings, reporting, node, profile) {
137 Ok(result) => Ok(result),
138 Err(e) if e.is_connection_err() => {
139 term::hint(
140 "Node is stopped. To announce changes to the network, start it with `rad node start`.",
141 );
142 Ok(None)
143 }
144 Err(e) => Err(e),
145 }
146}
147
148fn announce_<R>(
149 repo: &R,
150 settings: SyncSettings,
151 reporting: SyncReporting,
152 node: &mut Node,
153 profile: &Profile,
154) -> Result<Option<sync::AnnouncerResult>, SyncError>
155where
156 R: ReadRepository,
157{
158 let me = profile.id();
159 let rid = repo.id();
160 let doc = repo.identity_doc()?;
161
162 let settings = settings.with_profile(profile);
163 let n_preferred_seeds = settings.seeds.len();
164
165 let config = match sync::PrivateNetwork::private_repo(&doc) {
166 None => {
167 let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
168 (BTreeSet::new(), BTreeSet::new()),
169 |(mut synced, mut unsynced), seed| {
170 if seed.is_synced() {
171 synced.insert(seed.nid);
172 } else {
173 unsynced.insert(seed.nid);
174 }
175 (synced, unsynced)
176 },
177 );
178 sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
179 }
180 Some(network) => {
181 let sessions = node.sessions()?;
182 let network =
183 network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
184 sync::AnnouncerConfig::private(*me, settings.replicas, network)
185 }
186 };
187 let announcer = match sync::Announcer::new(config) {
188 Ok(announcer) => announcer,
189 Err(err) => match err {
190 sync::AnnouncerError::AlreadySynced(result) => {
191 term::success!(
192 &mut reporting.completion.writer();
193 "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
194 term::format::positive(result.synced()),
195 );
196 return Ok(None);
197 }
198 sync::AnnouncerError::NoSeeds => {
199 term::info!(
200 &mut reporting.completion.writer();
201 "{}",
202 term::format::yellow(format!("No seeds found for {rid}."))
203 );
204 return Ok(None);
205 }
206 sync::AnnouncerError::Target(err) => return Err(err.into()),
207 },
208 };
209 let target = announcer.target();
210 let min_replicas = target.replicas().lower_bound();
211 let mut spinner = term::spinner_to(
212 format!("Found {} seed(s)..", announcer.progress().unsynced()),
213 reporting.progress.clone(),
214 reporting.completion.clone(),
215 );
216
217 match node.announce(
218 rid,
219 [profile.did().into()],
220 settings.timeout,
221 announcer,
222 |node, progress| {
223 spinner.message(format!(
224 "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
225 term::format::node_id_human_compact(node),
226 term::format::secondary(progress.preferred()),
227 term::format::secondary(n_preferred_seeds),
228 term::format::secondary(progress.synced()),
229 term::format::secondary(min_replicas.max(progress.synced())),
232 ));
233 },
234 ) {
235 Ok(result) => {
236 spinner.message(format!(
237 "Synced with {} seed(s)",
238 term::format::positive(result.synced().len())
239 ));
240 spinner.finish();
241 Ok(Some(result))
242 }
243 Err(err) => {
244 spinner.error(format!("Sync failed: {err}"));
245 Err(err.into())
246 }
247 }
248}