1use core::time;
2use std::collections::BTreeSet;
3use std::io::Write;
4
5use radicle::node::sync;
6use radicle::node::{Handle as _, NodeId};
7use radicle::storage::{ReadRepository, RepositoryError};
8use radicle::{Node, Profile};
9
10use crate::terminal as term;
11
12pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct SyncSettings {
18 pub replicas: sync::ReplicationFactor,
20 pub seeds: BTreeSet<NodeId>,
22 pub timeout: time::Duration,
24}
25
26impl SyncSettings {
27 #[must_use]
29 pub fn timeout(mut self, timeout: time::Duration) -> Self {
30 self.timeout = timeout;
31 self
32 }
33
34 #[must_use]
36 pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
37 self.replicas = replicas;
38 self
39 }
40
41 pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
43 self.seeds = seeds.into_iter().collect();
44 self
45 }
46
47 #[must_use]
50 pub fn with_profile(mut self, profile: &Profile) -> Self {
51 if self.seeds.is_empty() {
53 self.seeds = profile
54 .config
55 .preferred_seeds
56 .iter()
57 .map(|p| p.id)
58 .collect();
59 }
60 self.seeds.remove(profile.id());
62 self
63 }
64}
65
66impl Default for SyncSettings {
67 fn default() -> Self {
68 Self {
69 replicas: sync::ReplicationFactor::default(),
70 seeds: BTreeSet::new(),
71 timeout: DEFAULT_SYNC_TIMEOUT,
72 }
73 }
74}
75
76#[derive(thiserror::Error, Debug)]
78pub enum SyncError {
79 #[error(transparent)]
80 Repository(#[from] RepositoryError),
81 #[error(transparent)]
82 Node(#[from] radicle::node::Error),
83 #[error("all seeds timed out")]
84 AllSeedsTimedOut,
85 #[error(transparent)]
86 Target(#[from] sync::announce::TargetError),
87}
88
89impl SyncError {
90 fn is_connection_err(&self) -> bool {
91 match self {
92 Self::Node(e) => e.is_connection_err(),
93 Self::Repository(_) | Self::AllSeedsTimedOut | Self::Target(_) => false,
94 }
95 }
96}
97
98pub struct SyncReporting {
100 pub progress: term::PaintTarget,
102 pub completion: term::PaintTarget,
104 pub debug: bool,
106}
107
108impl Default for SyncReporting {
109 fn default() -> Self {
110 Self {
111 progress: term::PaintTarget::Stderr,
112 completion: term::PaintTarget::Stdout,
113 debug: false,
114 }
115 }
116}
117
118pub fn announce<R: ReadRepository>(
120 repo: &R,
121 settings: SyncSettings,
122 reporting: SyncReporting,
123 node: &mut Node,
124 profile: &Profile,
125) -> Result<Option<sync::AnnouncerResult>, SyncError> {
126 match announce_(repo, settings, reporting, node, profile) {
127 Ok(result) => Ok(result),
128 Err(e) if e.is_connection_err() => {
129 term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
130 Ok(None)
131 }
132 Err(e) => Err(e),
133 }
134}
135
136fn announce_<R>(
137 repo: &R,
138 settings: SyncSettings,
139 reporting: SyncReporting,
140 node: &mut Node,
141 profile: &Profile,
142) -> Result<Option<sync::AnnouncerResult>, SyncError>
143where
144 R: ReadRepository,
145{
146 let me = profile.id();
147 let rid = repo.id();
148 let doc = repo.identity_doc()?;
149
150 let settings = settings.with_profile(profile);
151 let n_preferred_seeds = settings.seeds.len();
152
153 let config = match sync::PrivateNetwork::private_repo(&doc) {
154 None => {
155 let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
156 (BTreeSet::new(), BTreeSet::new()),
157 |(mut synced, mut unsynced), seed| {
158 if seed.is_synced() {
159 synced.insert(seed.nid);
160 } else {
161 unsynced.insert(seed.nid);
162 }
163 (synced, unsynced)
164 },
165 );
166 sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
167 }
168 Some(network) => {
169 let sessions = node.sessions()?;
170 let network =
171 network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
172 sync::AnnouncerConfig::private(*me, settings.replicas, network)
173 }
174 };
175 let announcer = match sync::Announcer::new(config) {
176 Ok(announcer) => announcer,
177 Err(err) => match err {
178 sync::AnnouncerError::AlreadySynced(result) => {
179 term::success!(
180 &mut reporting.completion.writer();
181 "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
182 term::format::positive(result.synced()),
183 );
184 return Ok(None);
185 }
186 sync::AnnouncerError::NoSeeds => {
187 term::info!(
188 &mut reporting.completion.writer();
189 "{}",
190 term::format::yellow(format!("No seeds found for {rid}."))
191 );
192 return Ok(None);
193 }
194 sync::AnnouncerError::Target(err) => return Err(err.into()),
195 },
196 };
197 let target = announcer.target();
198 let min_replicas = target.replicas().lower_bound();
199 let mut spinner = term::spinner_to(
200 format!("Found {} seed(s)..", announcer.progress().unsynced()),
201 reporting.progress.clone(),
202 reporting.completion.clone(),
203 );
204
205 match node.announce(
206 rid,
207 [profile.did().into()],
208 settings.timeout,
209 announcer,
210 |node, progress| {
211 spinner.message(format!(
212 "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
213 term::format::node_id_human_compact(node),
214 term::format::secondary(progress.preferred()),
215 term::format::secondary(n_preferred_seeds),
216 term::format::secondary(progress.synced()),
217 term::format::secondary(min_replicas.max(progress.synced())),
220 ));
221 },
222 ) {
223 Ok(result) => {
224 spinner.message(format!(
225 "Synced with {} seed(s)",
226 term::format::positive(result.synced().len())
227 ));
228 spinner.finish();
229 Ok(Some(result))
230 }
231 Err(err) => {
232 spinner.error(format!("Sync failed: {err}"));
233 Err(err.into())
234 }
235 }
236}