1use core::time;
2use std::collections::BTreeSet;
3use std::io::Write;
4
5use radicle::node::sync;
6use radicle::node::{Handle as _, NodeId};
7use radicle::storage::{refs, ReadRepository, RepositoryError};
8use radicle::{Node, Profile};
9
10use crate::terminal as term;
11
12pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct SyncSettings {
18 pub replicas: sync::ReplicationFactor,
20 pub seeds: BTreeSet<NodeId>,
22 pub timeout: time::Duration,
24 pub signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
26}
27
28impl SyncSettings {
29 #[must_use]
31 pub fn timeout(mut self, timeout: time::Duration) -> Self {
32 self.timeout = timeout;
33 self
34 }
35
36 #[must_use]
38 pub fn minimum_feature_level(mut self, feature_level: Option<refs::FeatureLevel>) -> Self {
39 self.signed_references_minimum_feature_level = feature_level;
40 self
41 }
42
43 #[must_use]
45 pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
46 self.replicas = replicas;
47 self
48 }
49
50 pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
52 self.seeds = seeds.into_iter().collect();
53 self
54 }
55
56 #[must_use]
59 pub fn with_profile(mut self, profile: &Profile) -> Self {
60 if self.seeds.is_empty() {
62 self.seeds = profile
63 .config
64 .preferred_seeds
65 .iter()
66 .map(|p| p.id)
67 .collect();
68 }
69 self.seeds.remove(profile.id());
71 self
72 }
73}
74
75impl Default for SyncSettings {
76 fn default() -> Self {
77 Self {
78 replicas: sync::ReplicationFactor::default(),
79 seeds: BTreeSet::new(),
80 timeout: DEFAULT_SYNC_TIMEOUT,
81 signed_references_minimum_feature_level: None,
82 }
83 }
84}
85
86#[derive(thiserror::Error, Debug)]
88pub enum SyncError {
89 #[error(transparent)]
90 Repository(#[from] RepositoryError),
91 #[error(transparent)]
92 Node(#[from] radicle::node::Error),
93 #[error("all seeds timed out")]
94 AllSeedsTimedOut,
95 #[error(transparent)]
96 Target(#[from] sync::announce::TargetError),
97}
98
99impl SyncError {
100 fn is_connection_err(&self) -> bool {
101 match self {
102 Self::Node(e) => e.is_connection_err(),
103 Self::Repository(_) | Self::AllSeedsTimedOut | Self::Target(_) => false,
104 }
105 }
106}
107
108pub struct SyncReporting {
110 pub progress: term::PaintTarget,
112 pub completion: term::PaintTarget,
114 pub debug: bool,
116}
117
118impl Default for SyncReporting {
119 fn default() -> Self {
120 Self {
121 progress: term::PaintTarget::Stderr,
122 completion: term::PaintTarget::Stdout,
123 debug: false,
124 }
125 }
126}
127
128pub fn announce<R: ReadRepository>(
130 repo: &R,
131 settings: SyncSettings,
132 reporting: SyncReporting,
133 node: &mut Node,
134 profile: &Profile,
135) -> Result<Option<sync::AnnouncerResult>, SyncError> {
136 match announce_(repo, settings, reporting, node, profile) {
137 Ok(result) => Ok(result),
138 Err(e) if e.is_connection_err() => {
139 term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
140 Ok(None)
141 }
142 Err(e) => Err(e),
143 }
144}
145
146fn announce_<R>(
147 repo: &R,
148 settings: SyncSettings,
149 reporting: SyncReporting,
150 node: &mut Node,
151 profile: &Profile,
152) -> Result<Option<sync::AnnouncerResult>, SyncError>
153where
154 R: ReadRepository,
155{
156 let me = profile.id();
157 let rid = repo.id();
158 let doc = repo.identity_doc()?;
159
160 let settings = settings.with_profile(profile);
161 let n_preferred_seeds = settings.seeds.len();
162
163 let config = match sync::PrivateNetwork::private_repo(&doc) {
164 None => {
165 let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
166 (BTreeSet::new(), BTreeSet::new()),
167 |(mut synced, mut unsynced), seed| {
168 if seed.is_synced() {
169 synced.insert(seed.nid);
170 } else {
171 unsynced.insert(seed.nid);
172 }
173 (synced, unsynced)
174 },
175 );
176 sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
177 }
178 Some(network) => {
179 let sessions = node.sessions()?;
180 let network =
181 network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
182 sync::AnnouncerConfig::private(*me, settings.replicas, network)
183 }
184 };
185 let announcer = match sync::Announcer::new(config) {
186 Ok(announcer) => announcer,
187 Err(err) => match err {
188 sync::AnnouncerError::AlreadySynced(result) => {
189 term::success!(
190 &mut reporting.completion.writer();
191 "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
192 term::format::positive(result.synced()),
193 );
194 return Ok(None);
195 }
196 sync::AnnouncerError::NoSeeds => {
197 term::info!(
198 &mut reporting.completion.writer();
199 "{}",
200 term::format::yellow(format!("No seeds found for {rid}."))
201 );
202 return Ok(None);
203 }
204 sync::AnnouncerError::Target(err) => return Err(err.into()),
205 },
206 };
207 let target = announcer.target();
208 let min_replicas = target.replicas().lower_bound();
209 let mut spinner = term::spinner_to(
210 format!("Found {} seed(s)..", announcer.progress().unsynced()),
211 reporting.progress.clone(),
212 reporting.completion.clone(),
213 );
214
215 match node.announce(
216 rid,
217 [profile.did().into()],
218 settings.timeout,
219 announcer,
220 |node, progress| {
221 spinner.message(format!(
222 "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
223 term::format::node_id_human_compact(node),
224 term::format::secondary(progress.preferred()),
225 term::format::secondary(n_preferred_seeds),
226 term::format::secondary(progress.synced()),
227 term::format::secondary(min_replicas.max(progress.synced())),
230 ));
231 },
232 ) {
233 Ok(result) => {
234 spinner.message(format!(
235 "Synced with {} seed(s)",
236 term::format::positive(result.synced().len())
237 ));
238 spinner.finish();
239 Ok(Some(result))
240 }
241 Err(err) => {
242 spinner.error(format!("Sync failed: {err}"));
243 Err(err.into())
244 }
245 }
246}