1use core::time;
2use std::collections::BTreeSet;
3use std::io;
4use std::io::Write;
5
6use radicle::node::sync;
7use radicle::node::{Handle as _, NodeId};
8use radicle::storage::{ReadRepository, RepositoryError};
9use radicle::{Node, Profile};
10
11use crate::terminal as term;
12
13pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct SyncSettings {
19 pub replicas: sync::ReplicationFactor,
21 pub seeds: BTreeSet<NodeId>,
23 pub timeout: time::Duration,
25}
26
27impl SyncSettings {
28 pub fn timeout(mut self, timeout: time::Duration) -> Self {
30 self.timeout = timeout;
31 self
32 }
33
34 pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
36 self.replicas = replicas;
37 self
38 }
39
40 pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
42 self.seeds = seeds.into_iter().collect();
43 self
44 }
45
46 pub fn with_profile(mut self, profile: &Profile) -> Self {
49 if self.seeds.is_empty() {
51 self.seeds = profile
52 .config
53 .preferred_seeds
54 .iter()
55 .map(|p| p.id)
56 .collect();
57 }
58 self.seeds.remove(profile.id());
60 self
61 }
62}
63
64impl Default for SyncSettings {
65 fn default() -> Self {
66 Self {
67 replicas: sync::ReplicationFactor::default(),
68 seeds: BTreeSet::new(),
69 timeout: DEFAULT_SYNC_TIMEOUT,
70 }
71 }
72}
73
74#[derive(thiserror::Error, Debug)]
76pub enum SyncError {
77 #[error(transparent)]
78 Repository(#[from] RepositoryError),
79 #[error(transparent)]
80 Node(#[from] radicle::node::Error),
81 #[error("all seeds timed out")]
82 AllSeedsTimedOut,
83 #[error(transparent)]
84 Target(#[from] sync::announce::TargetError),
85}
86
87impl SyncError {
88 fn is_connection_err(&self) -> bool {
89 match self {
90 Self::Node(e) => e.is_connection_err(),
91 _ => false,
92 }
93 }
94}
95
96#[derive(Debug)]
98pub enum SyncWriter {
99 Stdout(io::Stdout),
101 Stderr(io::Stderr),
103 Sink,
105}
106
107impl Clone for SyncWriter {
108 fn clone(&self) -> Self {
109 match self {
110 Self::Stdout(_) => Self::Stdout(io::stdout()),
111 Self::Stderr(_) => Self::Stderr(io::stderr()),
112 Self::Sink => Self::Sink,
113 }
114 }
115}
116
117impl io::Write for SyncWriter {
118 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
119 match self {
120 Self::Stdout(stdout) => stdout.write(buf),
121 Self::Stderr(stderr) => stderr.write(buf),
122 Self::Sink => Ok(buf.len()),
123 }
124 }
125
126 fn flush(&mut self) -> io::Result<()> {
127 match self {
128 Self::Stdout(stdout) => stdout.flush(),
129 Self::Stderr(stderr) => stderr.flush(),
130 Self::Sink => Ok(()),
131 }
132 }
133}
134
135pub struct SyncReporting {
137 pub progress: SyncWriter,
139 pub completion: SyncWriter,
141 pub debug: bool,
143}
144
145impl Default for SyncReporting {
146 fn default() -> Self {
147 Self {
148 progress: SyncWriter::Stderr(io::stderr()),
149 completion: SyncWriter::Stdout(io::stdout()),
150 debug: false,
151 }
152 }
153}
154
155pub fn announce<R: ReadRepository>(
157 repo: &R,
158 settings: SyncSettings,
159 reporting: SyncReporting,
160 node: &mut Node,
161 profile: &Profile,
162) -> Result<Option<sync::AnnouncerResult>, SyncError> {
163 match announce_(repo, settings, reporting, node, profile) {
164 Ok(result) => Ok(result),
165 Err(e) if e.is_connection_err() => {
166 term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
167 Ok(None)
168 }
169 Err(e) => Err(e),
170 }
171}
172
173fn announce_<R>(
174 repo: &R,
175 settings: SyncSettings,
176 mut reporting: SyncReporting,
177 node: &mut Node,
178 profile: &Profile,
179) -> Result<Option<sync::AnnouncerResult>, SyncError>
180where
181 R: ReadRepository,
182{
183 let me = profile.id();
184 let rid = repo.id();
185 let doc = repo.identity_doc()?;
186
187 let settings = settings.with_profile(profile);
188 let n_preferred_seeds = settings.seeds.len();
189
190 let config = match sync::PrivateNetwork::private_repo(&doc) {
191 None => {
192 let (synced, unsynced) = node.seeds(rid)?.iter().fold(
193 (BTreeSet::new(), BTreeSet::new()),
194 |(mut synced, mut unsynced), seed| {
195 if seed.is_synced() {
196 synced.insert(seed.nid);
197 } else {
198 unsynced.insert(seed.nid);
199 }
200 (synced, unsynced)
201 },
202 );
203 sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
204 }
205 Some(network) => {
206 let sessions = node.sessions()?;
207 let network =
208 network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
209 sync::AnnouncerConfig::private(*me, settings.replicas, network)
210 }
211 };
212 let announcer = match sync::Announcer::new(config) {
213 Ok(announcer) => announcer,
214 Err(err) => match err {
215 sync::AnnouncerError::AlreadySynced(result) => {
216 term::success!(
217 &mut reporting.completion;
218 "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
219 term::format::positive(result.synced()),
220 );
221 return Ok(None);
222 }
223 sync::AnnouncerError::NoSeeds => {
224 term::info!(
225 &mut reporting.completion;
226 "{}",
227 term::format::yellow(format!("No seeds found for {rid}."))
228 );
229 return Ok(None);
230 }
231 sync::AnnouncerError::Target(err) => return Err(err.into()),
232 },
233 };
234 let target = announcer.target();
235 let min_replicas = target.replicas().lower_bound();
236 let mut spinner = term::spinner_to(
237 format!("Found {} seed(s)..", announcer.progress().unsynced()),
238 reporting.completion.clone(),
239 reporting.progress.clone(),
240 );
241
242 match node.announce(rid, settings.timeout, announcer, |node, progress| {
243 spinner.message(format!(
244 "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
245 term::format::node_id_human_compact(node),
246 term::format::secondary(progress.preferred()),
247 term::format::secondary(n_preferred_seeds),
248 term::format::secondary(progress.synced()),
249 term::format::secondary(min_replicas.max(progress.synced())),
252 ));
253 }) {
254 Ok(result) => {
255 spinner.message(format!(
256 "Synced with {} seed(s)",
257 term::format::positive(result.synced().len())
258 ));
259 spinner.finish();
260 Ok(Some(result))
261 }
262 Err(err) => {
263 spinner.error(format!("Sync failed: {err}"));
264 Err(err.into())
265 }
266 }
267}