1mod args;
2
3use std::cmp::Ordering;
4use std::collections::BTreeMap;
5use std::collections::HashSet;
6use std::time;
7
8use anyhow::anyhow;
9
10use radicle::node;
11use radicle::node::SyncedAt;
12use radicle::node::address::Store;
13use radicle::node::sync;
14use radicle::node::sync::fetch::SuccessfulOutcome;
15use radicle::node::{AliasStore, Handle as _, Node, Seed, SyncStatus};
16use radicle::prelude::{NodeId, Profile, RepoId};
17use radicle::storage::ReadRepository;
18use radicle::storage::RefUpdate;
19use radicle::storage::{ReadStorage, RemoteRepository};
20use radicle_term::Element;
21
22use crate::node::SyncReporting;
23use crate::node::SyncSettings;
24use crate::terminal as term;
25use crate::terminal::args::rid_or_cwd;
26use crate::terminal::format::Author;
27use crate::terminal::{Table, TableOptions};
28
29pub use args::Args;
30use args::{Command, SortBy, SyncDirection, SyncMode};
31
32pub fn run(args: Args, ctx: impl term::Context) -> anyhow::Result<()> {
33 let profile = ctx.profile()?;
34 let mut node = radicle::Node::new(profile.socket_from_env());
35 if !node.is_running() {
36 anyhow::bail!(
37 "to sync a repository, your node must be running. To start it, run `rad node start`"
38 );
39 }
40 let verbose = args.verbose;
41 let debug = args.verbose;
42
43 match args.command {
44 Some(Command::Status { repo, sort_by }) => {
45 let (_, rid) = rid_or_cwd(repo)?;
46 sync_status(rid, &mut node, &profile, &sort_by, verbose)?;
47 }
48 None => match SyncMode::from(args.sync) {
49 SyncMode::Repo {
50 repo,
51 settings,
52 direction,
53 } => {
54 let (_, rid) = rid_or_cwd(repo)?;
55 let settings = settings.clone().with_profile(&profile);
56
57 if matches!(direction, SyncDirection::Fetch | SyncDirection::Both) {
58 if !profile.policies()?.is_seeding(&rid)? {
59 anyhow::bail!("repository {rid} is not seeded");
60 }
61 let result = fetch(rid, settings.clone(), &mut node, &profile)?;
62 display_fetch_result(&result, verbose)
63 }
64 if matches!(direction, SyncDirection::Announce | SyncDirection::Both) {
65 announce_refs(rid, settings, &mut node, &profile, verbose, debug)?;
66 }
67 }
68 SyncMode::Inventory => {
69 announce_inventory(node)?;
70 }
71 },
72 }
73
74 Ok(())
75}
76
77fn sync_status(
78 rid: RepoId,
79 node: &mut Node,
80 profile: &Profile,
81 sort_by: &SortBy,
82 verbose: bool,
83) -> anyhow::Result<()> {
84 const SYMBOL_STATE: &str = "?";
85 const SYMBOL_STATE_UNKNOWN: &str = "•";
86
87 let mut table = Table::<5, term::Label>::new(TableOptions::bordered());
88 let mut seeds: Vec<_> = node.seeds_for(rid, [*profile.did()])?.into();
89 let local_nid = node.nid()?;
90 let aliases = profile.aliases();
91
92 table.header([
93 term::format::bold("Node ID").into(),
94 term::format::bold("Alias").into(),
95 term::format::bold(SYMBOL_STATE).into(),
96 term::format::bold("SigRefs").into(),
97 term::format::bold("Timestamp").into(),
98 ]);
99 table.divider();
100
101 sort_seeds_by(local_nid, &mut seeds, &aliases, sort_by);
102
103 let seeds = seeds.into_iter().flat_map(|seed| {
104 let (status, head, time) = match seed.sync {
105 Some(SyncStatus::Synced {
106 at: SyncedAt { oid, timestamp },
107 }) => (
108 term::PREFIX_SUCCESS,
109 term::format::oid(oid),
110 term::format::timestamp(timestamp),
111 ),
112 Some(SyncStatus::OutOfSync {
113 remote: SyncedAt { timestamp, .. },
114 local,
115 ..
116 }) if seed.nid == local_nid => (
117 term::PREFIX_WARNING,
118 term::format::oid(local.oid),
119 term::format::timestamp(timestamp),
120 ),
121 Some(SyncStatus::OutOfSync {
122 remote: SyncedAt { oid, timestamp },
123 ..
124 }) => (
125 term::PREFIX_ERROR,
126 term::format::oid(oid),
127 term::format::timestamp(timestamp),
128 ),
129 None if verbose => (
130 term::format::dim(SYMBOL_STATE_UNKNOWN),
131 term::paint(String::new()),
132 term::paint(String::new()),
133 ),
134 None => return None,
135 };
136
137 let (alias, nid) = Author::new(&seed.nid, profile, verbose).labels();
138
139 Some([
140 nid,
141 alias,
142 status.into(),
143 term::format::secondary(head).into(),
144 time.dim().italic().into(),
145 ])
146 });
147
148 table.extend(seeds);
149 table.print();
150
151 if profile.hints() {
152 const COLUMN_WIDTH: usize = 16;
153 let status = format!(
154 "\n{:>4} … {}\n {} {}\n {} {}",
155 term::Paint::from(SYMBOL_STATE.to_string()).fg(radicle_term::Color::White),
156 term::format::dim("Status:"),
157 format_args!(
158 "{} {:width$}",
159 term::PREFIX_SUCCESS,
160 term::format::dim("… in sync"),
161 width = COLUMN_WIDTH,
162 ),
163 format_args!(
164 "{} {}",
165 term::PREFIX_ERROR,
166 term::format::dim("… out of sync")
167 ),
168 format_args!(
169 "{} {:width$}",
170 term::PREFIX_WARNING,
171 term::format::dim("… not announced"),
172 width = COLUMN_WIDTH,
173 ),
174 format_args!(
175 "{} {}",
176 term::format::dim(SYMBOL_STATE_UNKNOWN),
177 term::format::dim("… unknown")
178 ),
179 );
180 term::hint(status);
181 }
182
183 Ok(())
184}
185
186fn announce_refs(
187 rid: RepoId,
188 settings: SyncSettings,
189 node: &mut Node,
190 profile: &Profile,
191 verbose: bool,
192 debug: bool,
193) -> anyhow::Result<()> {
194 let Ok(repo) = profile.storage.repository(rid) else {
195 return Err(anyhow!(
196 "nothing to announce, repository {rid} is not available locally"
197 ));
198 };
199 if let Err(e) = repo.remote(&profile.public_key) {
200 if e.is_not_found() {
201 term::println(term::format::italic(
202 "Nothing to announce, you don't have a fork of this repository.",
203 ));
204 return Ok(());
205 } else {
206 return Err(anyhow!("failed to load local fork of {rid}: {e}"));
207 }
208 }
209
210 let result = crate::node::announce(
211 &repo,
212 settings,
213 SyncReporting {
214 debug,
215 ..SyncReporting::default()
216 },
217 node,
218 profile,
219 )?;
220 if let Some(result) = result {
221 print_announcer_result(&result, verbose)
222 }
223
224 Ok(())
225}
226
227pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
228 let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count();
229 let spinner = term::spinner(format!("Announcing inventory to {peers} peers.."));
230
231 node.announce_inventory()?;
232 spinner.finish();
233
234 Ok(())
235}
236
237#[derive(Debug, thiserror::Error)]
238pub enum FetchError {
239 #[error(transparent)]
240 Node(#[from] node::Error),
241 #[error(transparent)]
242 Db(#[from] node::db::Error),
243 #[error(transparent)]
244 Address(#[from] node::address::Error),
245 #[error(transparent)]
246 Fetcher(#[from] sync::FetcherError),
247}
248
249pub fn fetch(
250 rid: RepoId,
251 settings: SyncSettings,
252 node: &mut Node,
253 profile: &Profile,
254) -> Result<sync::FetcherResult, FetchError> {
255 let db = profile.database()?;
256 let local = profile.id();
257 let is_private = profile.storage.repository(rid).ok().and_then(|repo| {
258 let doc = repo.identity_doc().ok()?.doc;
259 sync::PrivateNetwork::private_repo(&doc)
260 });
261 let config = match is_private {
262 Some(private) => sync::FetcherConfig::private(private, settings.replicas, *local),
263 None => {
264 let seeds = node.seeds_for(rid, [*profile.did()])?;
267 let (connected, disconnected) = seeds.partition();
268 let candidates = connected
269 .into_iter()
270 .map(|seed| seed.nid)
271 .chain(disconnected.into_iter().filter_map(|seed| {
272 (!seed.addrs.is_empty()).then_some(seed.nid)
274 }))
275 .map(sync::fetch::Candidate::new);
276 sync::FetcherConfig::public(settings.seeds.clone(), settings.replicas, *local)
277 .with_candidates(candidates)
278 }
279 };
280 let mut fetcher = sync::Fetcher::new(config)?;
281
282 let mut progress = fetcher.progress();
283 term::info!(
284 "Fetching {} from the network, found {} potential seed(s).",
285 term::format::tertiary(rid),
286 term::format::tertiary(progress.candidate())
287 );
288 let mut spinner = FetcherSpinner::new(fetcher.target(), &progress);
289
290 while let Some(nid) = fetcher.next_node() {
291 match node.session(nid)? {
292 Some(session) if session.is_connected() => fetcher.ready_to_fetch(nid, session.addr),
293 _ => {
294 let addrs = db.addresses_of(&nid)?;
295 if addrs.is_empty() {
296 fetcher.fetch_failed(nid, "Could not connect. No addresses known.");
297 } else if let Some(addr) = connect(
298 nid,
299 addrs.into_iter().map(|ka| ka.addr),
300 settings.timeout,
301 node,
302 &mut spinner,
303 &fetcher.progress(),
304 ) {
305 fetcher.ready_to_fetch(nid, addr)
306 } else {
307 fetcher
308 .fetch_failed(nid, "Could not connect. At least one address is known but all attempts timed out.");
309 }
310 }
311 }
312 if let Some((nid, addr)) = fetcher.next_fetch() {
313 spinner.emit_fetching(&nid, &addr, &progress);
314 let result = node.fetch(
315 rid,
316 nid,
317 settings.timeout,
318 settings.signed_references_minimum_feature_level,
319 )?;
320 match fetcher.fetch_complete(nid, result) {
321 std::ops::ControlFlow::Continue(update) => {
322 spinner.emit_progress(&update);
323 progress = update
324 }
325 std::ops::ControlFlow::Break(success) => {
326 spinner.finished(success.outcome());
327 return Ok(sync::FetcherResult::TargetReached(success));
328 }
329 }
330 }
331 }
332 let result = fetcher.finish();
333 match &result {
334 sync::FetcherResult::TargetReached(success) => {
335 spinner.finished(success.outcome());
336 }
337 sync::FetcherResult::TargetError(missed) => spinner.failed(missed),
338 }
339 Ok(result)
340}
341
342fn connect(
346 nid: NodeId,
347 addrs: impl Iterator<Item = node::Address>,
348 timeout: time::Duration,
349 node: &mut Node,
350 spinner: &mut FetcherSpinner,
351 progress: &sync::fetch::Progress,
352) -> Option<node::Address> {
353 for addr in addrs {
354 spinner.emit_dialing(&nid, &addr, progress);
355 let cr = node.connect(
356 nid,
357 addr.clone(),
358 node::ConnectOptions {
359 persistent: false,
360 timeout,
361 },
362 );
363
364 match cr {
365 Ok(node::ConnectResult::Connected) => {
366 return Some(addr);
367 }
368 Ok(node::ConnectResult::Disconnected { .. }) => {
369 continue;
370 }
371 Err(e) => {
372 log::warn!(target: "cli", "Failed to connect to {nid}@{addr}: {e}");
373 continue;
374 }
375 }
376 }
377 None
378}
379
380fn sort_seeds_by(local: NodeId, seeds: &mut [Seed], aliases: &impl AliasStore, sort_by: &SortBy) {
381 let compare = |a: &Seed, b: &Seed| match sort_by {
382 SortBy::Nid => a.nid.cmp(&b.nid),
383 SortBy::Alias => {
384 let a = aliases.alias(&a.nid);
385 let b = aliases.alias(&b.nid);
386 a.cmp(&b)
387 }
388 SortBy::Status => match (&a.sync, &b.sync) {
389 (Some(_), None) => Ordering::Less,
390 (None, Some(_)) => Ordering::Greater,
391 (Some(a), Some(b)) => a.cmp(b).reverse(),
392 (None, None) => Ordering::Equal,
393 },
394 };
395
396 seeds.sort_by(|a, b| {
398 if a.nid == local {
399 Ordering::Less
400 } else if b.nid == local {
401 Ordering::Greater
402 } else {
403 compare(a, b)
404 }
405 });
406}
407
408struct FetcherSpinner {
409 preferred_seeds: usize,
410 replicas: sync::ReplicationFactor,
411 spinner: term::Spinner,
412}
413
414impl FetcherSpinner {
415 fn new(target: &sync::fetch::Target, progress: &sync::fetch::Progress) -> Self {
416 let preferred_seeds = target.preferred_seeds().len();
417 let replicas = target.replicas();
418 let spinner = term::spinner(format!(
419 "{} of {} preferred seeds, and {} of at least {} total seeds.",
420 term::format::secondary(progress.preferred()),
421 term::format::secondary(preferred_seeds),
422 term::format::secondary(progress.succeeded()),
423 term::format::secondary(replicas.lower_bound())
424 ));
425 Self {
426 preferred_seeds: target.preferred_seeds().len(),
427 replicas: *target.replicas(),
428 spinner,
429 }
430 }
431
432 fn emit_progress(&mut self, progress: &sync::fetch::Progress) {
433 self.spinner.message(format!(
434 "{} of {} preferred seeds, and {} of at least {} total seeds.",
435 term::format::secondary(progress.preferred()),
436 term::format::secondary(self.preferred_seeds),
437 term::format::secondary(progress.succeeded()),
438 term::format::secondary(self.replicas.lower_bound()),
439 ))
440 }
441
442 fn emit_fetching(
443 &mut self,
444 node: &NodeId,
445 addr: &node::Address,
446 progress: &sync::fetch::Progress,
447 ) {
448 self.spinner.message(format!(
449 "{} of {} preferred seeds, and {} of at least {} total seeds… [fetch {}@{}]",
450 term::format::secondary(progress.preferred()),
451 term::format::secondary(self.preferred_seeds),
452 term::format::secondary(progress.succeeded()),
453 term::format::secondary(self.replicas.lower_bound()),
454 term::format::tertiary(term::format::node_id_human_compact(node)),
455 term::format::tertiary(addr.display_compact()),
456 ))
457 }
458
459 fn emit_dialing(
460 &mut self,
461 node: &NodeId,
462 addr: &node::Address,
463 progress: &sync::fetch::Progress,
464 ) {
465 self.spinner.message(format!(
466 "{} of {} preferred seeds, and {} of at least {} total seeds… [dial {}@{}]",
467 term::format::secondary(progress.preferred()),
468 term::format::secondary(self.preferred_seeds),
469 term::format::secondary(progress.succeeded()),
470 term::format::secondary(self.replicas.lower_bound()),
471 term::format::tertiary(term::format::node_id_human_compact(node)),
472 term::format::tertiary(addr.display_compact()),
473 ))
474 }
475
476 fn finished(mut self, outcome: &SuccessfulOutcome) {
477 match outcome {
478 SuccessfulOutcome::PreferredNodes { preferred } => {
479 self.spinner.message(format!(
480 "Target met: {} preferred seed(s).",
481 term::format::positive(preferred),
482 ));
483 }
484 SuccessfulOutcome::MinReplicas { succeeded, .. } => {
485 self.spinner.message(format!(
486 "Target met: {} seed(s)",
487 term::format::positive(succeeded)
488 ));
489 }
490 SuccessfulOutcome::MaxReplicas {
491 succeeded,
492 min,
493 max,
494 } => {
495 self.spinner.message(format!(
496 "Target met: {} of {} min and {} max seed(s)",
497 succeeded,
498 term::format::secondary(min),
499 term::format::secondary(max)
500 ));
501 }
502 }
503 self.spinner.finish()
504 }
505
506 fn failed(mut self, missed: &sync::fetch::TargetMissed) {
507 let mut message = "Target not met: ".to_string();
508 let missing_preferred_seeds = missed
509 .missed_nodes()
510 .iter()
511 .map(|nid| term::format::node_id_human(nid).to_string())
512 .collect::<Vec<_>>();
513 let required = missed.required_nodes();
514 if !missing_preferred_seeds.is_empty() {
515 message.push_str(&format!(
516 "could not fetch from [{}], and required {} more seed(s)",
517 missing_preferred_seeds.join(", "),
518 required
519 ));
520 } else {
521 message.push_str(&format!("required {required} more seed(s)"));
522 }
523 self.spinner.message(message);
524 self.spinner.failed();
525 }
526}
527
528fn display_fetch_result(result: &sync::FetcherResult, verbose: bool) {
529 match result {
530 sync::FetcherResult::TargetReached(success) => {
531 let progress = success.progress();
532 let results = success.fetch_results();
533 display_success(results.success(), verbose);
534 let failed = progress.failed();
535 if failed > 0 && verbose {
536 term::warning(format!("Failed to fetch from {failed} seed(s)."));
537 for (node, reason) in results.failed() {
538 term::warning(format!(
539 "{}: {}",
540 term::format::node_id_human(node),
541 term::format::yellow(reason),
542 ))
543 }
544 }
545 }
546 sync::FetcherResult::TargetError(failed) => {
547 let results = failed.fetch_results();
548 let progress = failed.progress();
549 let target = failed.target();
550 let succeeded = progress.succeeded();
551 let missed = failed.missed_nodes();
552 term::error(format!(
553 "Fetched from {} preferred seed(s), could not reach {} seed(s)",
554 succeeded,
555 target.replicas().lower_bound(),
556 ));
557 term::error(format!(
558 "Could not replicate from {} preferred seed(s)",
559 missed.len()
560 ));
561 for (node, reason) in results.failed() {
562 term::error(format!(
563 "{}: {}",
564 term::format::node_id_human(node),
565 term::format::negative(reason),
566 ))
567 }
568 if succeeded > 0 {
569 term::info!("Successfully fetched from the following seeds:");
570 display_success(results.success(), verbose)
571 }
572 }
573 }
574}
575
576fn display_success<'a>(
577 results: impl Iterator<Item = (&'a NodeId, &'a [RefUpdate], HashSet<NodeId>)>,
578 verbose: bool,
579) {
580 for (node, updates, _) in results {
581 term::println_prefixed(
582 "🌱 Fetched from",
583 term::format::secondary(term::format::node_id_human(node)),
584 );
585 if verbose {
586 let mut updates = updates
587 .iter()
588 .filter(|up| !matches!(up, RefUpdate::Skipped { .. }))
589 .peekable();
590 if updates.peek().is_none() {
591 term::indented(term::format::italic("no references were updated"));
592 } else {
593 for update in updates {
594 term::indented(term::format::ref_update_verbose(update))
595 }
596 }
597 }
598 }
599}
600
601fn print_announcer_result(result: &sync::AnnouncerResult, verbose: bool) {
602 use sync::announce::SuccessfulOutcome::*;
603 match result {
604 sync::AnnouncerResult::Success(success) if verbose => {
605 match success.outcome() {
608 MinReplicationFactor { preferred, synced }
609 | MaxReplicationFactor { preferred, synced }
610 | PreferredNodes {
611 preferred,
612 total_nodes_synced: synced,
613 } => {
614 if preferred == 0 {
615 term::success!("Synced {} seed(s)", term::format::positive(synced));
616 } else {
617 term::success!(
618 "Synced {} preferred seed(s) and {} total seed(s)",
619 term::format::positive(preferred),
620 term::format::positive(synced)
621 );
622 }
623 }
624 }
625 print_synced(success.synced());
626 }
627 sync::AnnouncerResult::Success(_) => {
628 }
630 sync::AnnouncerResult::TimedOut(result) => {
631 if result.synced().is_empty() {
632 term::error("All seeds timed out, use `rad sync -v` to see the list of seeds");
633 return;
634 }
635 let timed_out = result.timed_out();
636 term::warning(format!(
637 "{} seed(s) timed out, use `rad sync -v` to see the list of seeds",
638 timed_out.len(),
639 ));
640 if verbose {
641 print_synced(result.synced());
642 for node in timed_out {
643 term::warning(format!("{} timed out", term::format::node_id_human(node)));
644 }
645 }
646 }
647 sync::AnnouncerResult::NoNodes(result) => {
648 term::info!("Announcement could not sync with anymore seeds.");
649 if verbose {
650 print_synced(result.synced())
651 }
652 }
653 }
654}
655
656fn print_synced(synced: &BTreeMap<NodeId, sync::announce::SyncStatus>) {
657 for (node, status) in synced.iter() {
658 let mut message = format!("🌱 Synced with {}", term::format::node_id_human(node));
659
660 match status {
661 sync::announce::SyncStatus::AlreadySynced => {
662 message.push_str(&format!("{}", term::format::dim(" (already in sync)")));
663 }
664 sync::announce::SyncStatus::Synced { duration } => {
665 message.push_str(&format!(
666 "{}",
667 term::format::dim(format!(" in {}s", duration.as_secs()))
668 ));
669 }
670 }
671 term::info!("{}", message);
672 }
673}