1use core::time;
2use std::collections::BTreeSet;
3use std::io::Write;
4
5use radicle::node::sync;
6use radicle::node::{Handle as _, NodeId};
7use radicle::storage::{ReadRepository, RepositoryError};
8use radicle::{Node, Profile};
9
10use crate::terminal as term;
11
12pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct SyncSettings {
18 pub replicas: sync::ReplicationFactor,
20 pub seeds: BTreeSet<NodeId>,
22 pub timeout: time::Duration,
24}
25
26impl SyncSettings {
27 pub fn timeout(mut self, timeout: time::Duration) -> Self {
29 self.timeout = timeout;
30 self
31 }
32
33 pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
35 self.replicas = replicas;
36 self
37 }
38
39 pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
41 self.seeds = seeds.into_iter().collect();
42 self
43 }
44
45 pub fn with_profile(mut self, profile: &Profile) -> Self {
48 if self.seeds.is_empty() {
50 self.seeds = profile
51 .config
52 .preferred_seeds
53 .iter()
54 .map(|p| p.id)
55 .collect();
56 }
57 self.seeds.remove(profile.id());
59 self
60 }
61}
62
63impl Default for SyncSettings {
64 fn default() -> Self {
65 Self {
66 replicas: sync::ReplicationFactor::default(),
67 seeds: BTreeSet::new(),
68 timeout: DEFAULT_SYNC_TIMEOUT,
69 }
70 }
71}
72
73#[derive(thiserror::Error, Debug)]
75pub enum SyncError {
76 #[error(transparent)]
77 Repository(#[from] RepositoryError),
78 #[error(transparent)]
79 Node(#[from] radicle::node::Error),
80 #[error("all seeds timed out")]
81 AllSeedsTimedOut,
82 #[error(transparent)]
83 Target(#[from] sync::announce::TargetError),
84}
85
86impl SyncError {
87 fn is_connection_err(&self) -> bool {
88 match self {
89 Self::Node(e) => e.is_connection_err(),
90 _ => false,
91 }
92 }
93}
94
95pub struct SyncReporting {
97 pub progress: term::PaintTarget,
99 pub completion: term::PaintTarget,
101 pub debug: bool,
103}
104
105impl Default for SyncReporting {
106 fn default() -> Self {
107 Self {
108 progress: term::PaintTarget::Stderr,
109 completion: term::PaintTarget::Stdout,
110 debug: false,
111 }
112 }
113}
114
115pub fn announce<R: ReadRepository>(
117 repo: &R,
118 settings: SyncSettings,
119 reporting: SyncReporting,
120 node: &mut Node,
121 profile: &Profile,
122) -> Result<Option<sync::AnnouncerResult>, SyncError> {
123 match announce_(repo, settings, reporting, node, profile) {
124 Ok(result) => Ok(result),
125 Err(e) if e.is_connection_err() => {
126 term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
127 Ok(None)
128 }
129 Err(e) => Err(e),
130 }
131}
132
133fn announce_<R>(
134 repo: &R,
135 settings: SyncSettings,
136 reporting: SyncReporting,
137 node: &mut Node,
138 profile: &Profile,
139) -> Result<Option<sync::AnnouncerResult>, SyncError>
140where
141 R: ReadRepository,
142{
143 let me = profile.id();
144 let rid = repo.id();
145 let doc = repo.identity_doc()?;
146
147 let settings = settings.with_profile(profile);
148 let n_preferred_seeds = settings.seeds.len();
149
150 let config = match sync::PrivateNetwork::private_repo(&doc) {
151 None => {
152 let (synced, unsynced) = node.seeds(rid)?.iter().fold(
153 (BTreeSet::new(), BTreeSet::new()),
154 |(mut synced, mut unsynced), seed| {
155 if seed.is_synced() {
156 synced.insert(seed.nid);
157 } else {
158 unsynced.insert(seed.nid);
159 }
160 (synced, unsynced)
161 },
162 );
163 sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
164 }
165 Some(network) => {
166 let sessions = node.sessions()?;
167 let network =
168 network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
169 sync::AnnouncerConfig::private(*me, settings.replicas, network)
170 }
171 };
172 let announcer = match sync::Announcer::new(config) {
173 Ok(announcer) => announcer,
174 Err(err) => match err {
175 sync::AnnouncerError::AlreadySynced(result) => {
176 term::success!(
177 &mut reporting.completion.writer();
178 "Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
179 term::format::positive(result.synced()),
180 );
181 return Ok(None);
182 }
183 sync::AnnouncerError::NoSeeds => {
184 term::info!(
185 &mut reporting.completion.writer();
186 "{}",
187 term::format::yellow(format!("No seeds found for {rid}."))
188 );
189 return Ok(None);
190 }
191 sync::AnnouncerError::Target(err) => return Err(err.into()),
192 },
193 };
194 let target = announcer.target();
195 let min_replicas = target.replicas().lower_bound();
196 let mut spinner = term::spinner_to(
197 format!("Found {} seed(s)..", announcer.progress().unsynced()),
198 reporting.progress.clone(),
199 reporting.completion.clone(),
200 );
201
202 match node.announce(rid, settings.timeout, announcer, |node, progress| {
203 spinner.message(format!(
204 "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
205 term::format::node_id_human_compact(node),
206 term::format::secondary(progress.preferred()),
207 term::format::secondary(n_preferred_seeds),
208 term::format::secondary(progress.synced()),
209 term::format::secondary(min_replicas.max(progress.synced())),
212 ));
213 }) {
214 Ok(result) => {
215 spinner.message(format!(
216 "Synced with {} seed(s)",
217 term::format::positive(result.synced().len())
218 ));
219 spinner.finish();
220 Ok(Some(result))
221 }
222 Err(err) => {
223 spinner.error(format!("Sync failed: {err}"));
224 Err(err.into())
225 }
226 }
227}