1use std::cmp::Ordering;
2use std::collections::BTreeMap;
3use std::collections::BTreeSet;
4use std::collections::HashSet;
5use std::ffi::OsString;
6use std::str::FromStr;
7use std::time;
8
9use anyhow::{anyhow, Context as _};
10
11use radicle::node;
12use radicle::node::address::Store;
13use radicle::node::sync;
14use radicle::node::sync::fetch::SuccessfulOutcome;
15use radicle::node::SyncedAt;
16use radicle::node::{AliasStore, Handle as _, Node, Seed, SyncStatus};
17use radicle::prelude::{NodeId, Profile, RepoId};
18use radicle::storage::ReadRepository;
19use radicle::storage::RefUpdate;
20use radicle::storage::{ReadStorage, RemoteRepository};
21use radicle_term::Element;
22
23use crate::node::SyncReporting;
24use crate::node::SyncSettings;
25use crate::terminal as term;
26use crate::terminal::args::{Args, Error, Help};
27use crate::terminal::format::Author;
28use crate::terminal::{Table, TableOptions};
29
30pub const HELP: Help = Help {
31 name: "sync",
32 description: "Sync repositories to the network",
33 version: env!("RADICLE_VERSION"),
34 usage: r#"
35Usage
36
37 rad sync [--fetch | --announce] [<rid>] [<option>...]
38 rad sync --inventory [<option>...]
39 rad sync status [<rid>] [<option>...]
40
41 By default, the current repository is synchronized both ways.
42 If an <rid> is specified, that repository is synced instead.
43
44 The process begins by fetching changes from connected seeds,
45 followed by announcing local refs to peers, thereby prompting
46 them to fetch from us.
47
48 When `--fetch` is specified, any number of seeds may be given
49 using the `--seed` option, eg. `--seed <nid>@<addr>:<port>`.
50
51 When `--replicas` is specified, the given replication factor will try
52 to be matched. For example, `--replicas 5` will sync with 5 seeds.
53
54 The synchronization process can be configured using `--replicas <min>` and
55 `--replicas-max <max>`. If these options are used independently, then the
56 replication factor is taken as the given `<min>`/`<max>` value. If the
57 options are used together, then the replication factor has a minimum and
58 maximum bound.
59
60 For fetching, the synchronization process will be considered successful if
61 at least `<min>` seeds were fetched from *or* all preferred seeds were
62 fetched from. If `<max>` is specified then the process will continue and
63 attempt to sync with `<max>` seeds.
64
65 For reference announcing, the synchronization process will be considered
66 successful if at least `<min>` seeds were pushed to *and* all preferred
67 seeds were pushed to.
68
69 When `--fetch` or `--announce` are specified on their own, this command
70 will only fetch or announce.
71
72 If `--inventory` is specified, the node's inventory is announced to
73 the network. This mode does not take an `<rid>`.
74
75Commands
76
77 status Display the sync status of a repository
78
79Options
80
81 --sort-by <field> Sort the table by column (options: nid, alias, status)
82 -f, --fetch Turn on fetching (default: true)
83 -a, --announce Turn on ref announcing (default: true)
84 -i, --inventory Turn on inventory announcing (default: false)
85 --timeout <secs> How many seconds to wait while syncing
86 --seed <nid> Sync with the given node (may be specified multiple times)
87 -r, --replicas <count> Sync with a specific number of seeds
88 --replicas-max <count> Sync with an upper bound number of seeds
89 -v, --verbose Verbose output
90 --debug Print debug information afer sync
91 --help Print help
92"#,
93};
94
95#[derive(Debug, Clone, PartialEq, Eq, Default)]
96pub enum Operation {
97 Synchronize(SyncMode),
98 #[default]
99 Status,
100}
101
102#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
103pub enum SortBy {
104 Nid,
105 Alias,
106 #[default]
107 Status,
108}
109
110impl FromStr for SortBy {
111 type Err = &'static str;
112
113 fn from_str(s: &str) -> Result<Self, Self::Err> {
114 match s {
115 "nid" => Ok(Self::Nid),
116 "alias" => Ok(Self::Alias),
117 "status" => Ok(Self::Status),
118 _ => Err("invalid `--sort-by` field"),
119 }
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub enum SyncMode {
125 Repo {
126 settings: SyncSettings,
127 direction: SyncDirection,
128 },
129 Inventory,
130}
131
132impl Default for SyncMode {
133 fn default() -> Self {
134 Self::Repo {
135 settings: SyncSettings::default(),
136 direction: SyncDirection::default(),
137 }
138 }
139}
140
141#[derive(Debug, Default, PartialEq, Eq, Clone)]
142pub enum SyncDirection {
143 Fetch,
144 Announce,
145 #[default]
146 Both,
147}
148
149#[derive(Default, Debug)]
150pub struct Options {
151 pub rid: Option<RepoId>,
152 pub debug: bool,
153 pub verbose: bool,
154 pub sort_by: SortBy,
155 pub op: Operation,
156}
157
158impl Args for Options {
159 fn from_args(args: Vec<OsString>) -> anyhow::Result<(Self, Vec<OsString>)> {
160 use lexopt::prelude::*;
161
162 let mut parser = lexopt::Parser::from_args(args);
163 let mut verbose = false;
164 let mut timeout = time::Duration::from_secs(9);
165 let mut rid = None;
166 let mut fetch = false;
167 let mut announce = false;
168 let mut inventory = false;
169 let mut debug = false;
170 let mut replicas = None;
171 let mut max_replicas = None;
172 let mut seeds = BTreeSet::new();
173 let mut sort_by = SortBy::default();
174 let mut op: Option<Operation> = None;
175
176 while let Some(arg) = parser.next()? {
177 match arg {
178 Long("debug") => {
179 debug = true;
180 }
181 Long("verbose") | Short('v') => {
182 verbose = true;
183 }
184 Long("fetch") | Short('f') => {
185 fetch = true;
186 }
187 Long("replicas") | Short('r') => {
188 let val = parser.value()?;
189 let count = term::args::number(&val)?;
190
191 if count == 0 {
192 anyhow::bail!("value for `--replicas` must be greater than zero");
193 }
194 replicas = Some(count);
195 }
196 Long("replicas-max") => {
197 let val = parser.value()?;
198 let count = term::args::number(&val)?;
199
200 if count == 0 {
201 anyhow::bail!("value for `--replicas-max` must be greater than zero");
202 }
203 max_replicas = Some(count);
204 }
205 Long("seed") => {
206 let val = parser.value()?;
207 let nid = term::args::nid(&val)?;
208
209 seeds.insert(nid);
210 }
211 Long("announce") | Short('a') => {
212 announce = true;
213 }
214 Long("inventory") | Short('i') => {
215 inventory = true;
216 }
217 Long("sort-by") if matches!(op, Some(Operation::Status)) => {
218 let value = parser.value()?;
219 sort_by = value.parse()?;
220 }
221 Long("timeout") | Short('t') => {
222 let value = parser.value()?;
223 let secs = term::args::parse_value("timeout", value)?;
224
225 timeout = time::Duration::from_secs(secs);
226 }
227 Long("help") | Short('h') => {
228 return Err(Error::Help.into());
229 }
230 Value(val) if rid.is_none() => match val.to_string_lossy().as_ref() {
231 "s" | "status" => {
232 op = Some(Operation::Status);
233 }
234 _ => {
235 rid = Some(term::args::rid(&val)?);
236 }
237 },
238 arg => {
239 return Err(anyhow!(arg.unexpected()));
240 }
241 }
242 }
243
244 let sync = if inventory && fetch {
245 anyhow::bail!("`--inventory` cannot be used with `--fetch`");
246 } else if inventory {
247 SyncMode::Inventory
248 } else {
249 let direction = match (fetch, announce) {
250 (true, true) | (false, false) => SyncDirection::Both,
251 (true, false) => SyncDirection::Fetch,
252 (false, true) => SyncDirection::Announce,
253 };
254 let mut settings = SyncSettings::default().timeout(timeout);
255
256 let replicas = match (replicas, max_replicas) {
257 (None, None) => sync::ReplicationFactor::default(),
258 (None, Some(min)) => sync::ReplicationFactor::must_reach(min),
259 (Some(min), None) => sync::ReplicationFactor::must_reach(min),
260 (Some(min), Some(max)) => sync::ReplicationFactor::range(min, max),
261 };
262 settings.replicas = replicas;
263 if !seeds.is_empty() {
264 settings.seeds = seeds;
265 }
266 SyncMode::Repo {
267 settings,
268 direction,
269 }
270 };
271
272 Ok((
273 Options {
274 rid,
275 debug,
276 verbose,
277 sort_by,
278 op: op.unwrap_or(Operation::Synchronize(sync)),
279 },
280 vec![],
281 ))
282 }
283}
284
285pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
286 let profile = ctx.profile()?;
287 let mut node = radicle::Node::new(profile.socket());
288 if !node.is_running() {
289 anyhow::bail!(
290 "to sync a repository, your node must be running. To start it, run `rad node start`"
291 );
292 }
293
294 match &options.op {
295 Operation::Status => {
296 let rid = match options.rid {
297 Some(rid) => rid,
298 None => {
299 let (_, rid) = radicle::rad::cwd()
300 .context("Current directory is not a Radicle repository")?;
301 rid
302 }
303 };
304 sync_status(rid, &mut node, &profile, &options)?;
305 }
306 Operation::Synchronize(SyncMode::Repo {
307 settings,
308 direction,
309 }) => {
310 let rid = match options.rid {
311 Some(rid) => rid,
312 None => {
313 let (_, rid) = radicle::rad::cwd()
314 .context("Current directory is not a Radicle repository")?;
315 rid
316 }
317 };
318 let settings = settings.clone().with_profile(&profile);
319
320 if [SyncDirection::Fetch, SyncDirection::Both].contains(direction) {
321 if !profile.policies()?.is_seeding(&rid)? {
322 anyhow::bail!("repository {rid} is not seeded");
323 }
324 let result = fetch(rid, settings.clone(), &mut node, &profile)?;
325 display_fetch_result(&result, options.verbose)
326 }
327 if [SyncDirection::Announce, SyncDirection::Both].contains(direction) {
328 announce_refs(rid, settings, &mut node, &profile, &options)?;
329 }
330 }
331 Operation::Synchronize(SyncMode::Inventory) => {
332 announce_inventory(node)?;
333 }
334 }
335 Ok(())
336}
337
338fn sync_status(
339 rid: RepoId,
340 node: &mut Node,
341 profile: &Profile,
342 options: &Options,
343) -> anyhow::Result<()> {
344 const SYMBOL_STATE: &str = "?";
345 const SYMBOL_STATE_UNKNOWN: &str = "•";
346
347 let mut table = Table::<5, term::Label>::new(TableOptions::bordered());
348 let mut seeds: Vec<_> = node.seeds(rid)?.into();
349 let local_nid = node.nid()?;
350 let aliases = profile.aliases();
351
352 table.header([
353 term::format::bold("Node ID").into(),
354 term::format::bold("Alias").into(),
355 term::format::bold(SYMBOL_STATE).into(),
356 term::format::bold("SigRefs").into(),
357 term::format::bold("Timestamp").into(),
358 ]);
359 table.divider();
360
361 sort_seeds_by(local_nid, &mut seeds, &aliases, &options.sort_by);
362
363 let seeds = seeds.into_iter().flat_map(|seed| {
364 let (status, head, time) = match seed.sync {
365 Some(SyncStatus::Synced {
366 at: SyncedAt { oid, timestamp },
367 }) => (
368 term::PREFIX_SUCCESS,
369 term::format::oid(oid),
370 term::format::timestamp(timestamp),
371 ),
372 Some(SyncStatus::OutOfSync {
373 remote: SyncedAt { timestamp, .. },
374 local,
375 ..
376 }) if seed.nid == local_nid => (
377 term::PREFIX_WARNING,
378 term::format::oid(local.oid),
379 term::format::timestamp(timestamp),
380 ),
381 Some(SyncStatus::OutOfSync {
382 remote: SyncedAt { oid, timestamp },
383 ..
384 }) => (
385 term::PREFIX_ERROR,
386 term::format::oid(oid),
387 term::format::timestamp(timestamp),
388 ),
389 None if options.verbose => (
390 term::format::dim(SYMBOL_STATE_UNKNOWN),
391 term::paint(String::new()),
392 term::paint(String::new()),
393 ),
394 None => return None,
395 };
396
397 let (alias, nid) = Author::new(&seed.nid, profile, options.verbose).labels();
398
399 Some([
400 nid,
401 alias,
402 status.into(),
403 term::format::secondary(head).into(),
404 time.dim().italic().into(),
405 ])
406 });
407
408 table.extend(seeds);
409 table.print();
410
411 if profile.hints() {
412 const COLUMN_WIDTH: usize = 16;
413 let status = format!(
414 "\n{:>4} … {}\n {} {}\n {} {}",
415 term::Paint::from(SYMBOL_STATE.to_string()).fg(radicle_term::Color::White),
416 term::format::dim("Status:"),
417 format_args!(
418 "{} {:width$}",
419 term::PREFIX_SUCCESS,
420 term::format::dim("… in sync"),
421 width = COLUMN_WIDTH,
422 ),
423 format_args!(
424 "{} {}",
425 term::PREFIX_ERROR,
426 term::format::dim("… out of sync")
427 ),
428 format_args!(
429 "{} {:width$}",
430 term::PREFIX_WARNING,
431 term::format::dim("… not announced"),
432 width = COLUMN_WIDTH,
433 ),
434 format_args!(
435 "{} {}",
436 term::format::dim(SYMBOL_STATE_UNKNOWN),
437 term::format::dim("… unknown")
438 ),
439 );
440 term::hint(status);
441 }
442
443 Ok(())
444}
445
446fn announce_refs(
447 rid: RepoId,
448 settings: SyncSettings,
449 node: &mut Node,
450 profile: &Profile,
451 options: &Options,
452) -> anyhow::Result<()> {
453 let Ok(repo) = profile.storage.repository(rid) else {
454 return Err(anyhow!(
455 "nothing to announce, repository {rid} is not available locally"
456 ));
457 };
458 if let Err(e) = repo.remote(&profile.public_key) {
459 if e.is_not_found() {
460 term::print(term::format::italic(
461 "Nothing to announce, you don't have a fork of this repository.",
462 ));
463 return Ok(());
464 } else {
465 return Err(anyhow!("failed to load local fork of {rid}: {e}"));
466 }
467 }
468
469 let result = crate::node::announce(
470 &repo,
471 settings,
472 SyncReporting {
473 debug: options.debug,
474 ..SyncReporting::default()
475 },
476 node,
477 profile,
478 )?;
479 if let Some(result) = result {
480 print_announcer_result(&result, options.verbose)
481 }
482
483 Ok(())
484}
485
486pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
487 let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count();
488 let spinner = term::spinner(format!("Announcing inventory to {peers} peers.."));
489
490 node.announce_inventory()?;
491 spinner.finish();
492
493 Ok(())
494}
495
496#[derive(Debug, thiserror::Error)]
497pub enum FetchError {
498 #[error(transparent)]
499 Node(#[from] node::Error),
500 #[error(transparent)]
501 Db(#[from] node::db::Error),
502 #[error(transparent)]
503 Address(#[from] node::address::Error),
504 #[error(transparent)]
505 Fetcher(#[from] sync::FetcherError),
506}
507
508pub fn fetch(
509 rid: RepoId,
510 settings: SyncSettings,
511 node: &mut Node,
512 profile: &Profile,
513) -> Result<sync::FetcherResult, FetchError> {
514 let db = profile.database()?;
515 let local = profile.id();
516 let is_private = profile.storage.repository(rid).ok().and_then(|repo| {
517 let doc = repo.identity_doc().ok()?.doc;
518 sync::PrivateNetwork::private_repo(&doc)
519 });
520 let config = match is_private {
521 Some(private) => sync::FetcherConfig::private(private, settings.replicas, *local),
522 None => {
523 let seeds = node.seeds(rid)?;
526 let (connected, disconnected) = seeds.partition();
527 let candidates = connected
528 .into_iter()
529 .map(|seed| seed.nid)
530 .chain(disconnected.into_iter().map(|seed| seed.nid))
531 .map(sync::fetch::Candidate::new);
532 sync::FetcherConfig::public(settings.seeds.clone(), settings.replicas, *local)
533 .with_candidates(candidates)
534 }
535 };
536 let mut fetcher = sync::Fetcher::new(config)?;
537
538 let mut progress = fetcher.progress();
539 term::info!(
540 "Fetching {} from the network, found {} potential seed(s).",
541 term::format::tertiary(rid),
542 term::format::tertiary(progress.candidate())
543 );
544 let mut spinner = FetcherSpinner::new(fetcher.target(), &progress);
545
546 while let Some(nid) = fetcher.next_node() {
547 match node.session(nid)? {
548 Some(session) if session.is_connected() => fetcher.ready_to_fetch(nid, session.addr),
549 _ => {
550 let addrs = db.addresses_of(&nid)?;
551 if addrs.is_empty() {
552 fetcher.fetch_failed(nid, "Could not connect. No addresses known.");
553 } else if let Some(addr) = connect(
554 nid,
555 addrs.into_iter().map(|ka| ka.addr),
556 settings.timeout,
557 node,
558 &mut spinner,
559 &fetcher.progress(),
560 ) {
561 fetcher.ready_to_fetch(nid, addr)
562 } else {
563 fetcher
564 .fetch_failed(nid, "Could not connect. At least one address is known but all attempts timed out.");
565 }
566 }
567 }
568 if let Some((nid, addr)) = fetcher.next_fetch() {
569 spinner.emit_fetching(&nid, &addr, &progress);
570 let result = node.fetch(rid, nid, settings.timeout)?;
571 match fetcher.fetch_complete(nid, result) {
572 std::ops::ControlFlow::Continue(update) => {
573 spinner.emit_progress(&update);
574 progress = update
575 }
576 std::ops::ControlFlow::Break(success) => {
577 spinner.finished(success.outcome());
578 return Ok(sync::FetcherResult::TargetReached(success));
579 }
580 }
581 }
582 }
583 let result = fetcher.finish();
584 match &result {
585 sync::FetcherResult::TargetReached(success) => {
586 spinner.finished(success.outcome());
587 }
588 sync::FetcherResult::TargetError(missed) => spinner.failed(missed),
589 }
590 Ok(result)
591}
592
593fn connect(
597 nid: NodeId,
598 addrs: impl Iterator<Item = node::Address>,
599 timeout: time::Duration,
600 node: &mut Node,
601 spinner: &mut FetcherSpinner,
602 progress: &sync::fetch::Progress,
603) -> Option<node::Address> {
604 for addr in addrs {
605 spinner.emit_dialing(&nid, &addr, progress);
606 let cr = node.connect(
607 nid,
608 addr.clone(),
609 node::ConnectOptions {
610 persistent: false,
611 timeout,
612 },
613 );
614
615 match cr {
616 Ok(node::ConnectResult::Connected) => {
617 return Some(addr);
618 }
619 Ok(node::ConnectResult::Disconnected { .. }) => {
620 continue;
621 }
622 Err(e) => {
623 log::warn!(target: "cli", "Failed to connect to {nid}@{addr}: {e}");
624 continue;
625 }
626 }
627 }
628 None
629}
630
631fn sort_seeds_by(local: NodeId, seeds: &mut [Seed], aliases: &impl AliasStore, sort_by: &SortBy) {
632 let compare = |a: &Seed, b: &Seed| match sort_by {
633 SortBy::Nid => a.nid.cmp(&b.nid),
634 SortBy::Alias => {
635 let a = aliases.alias(&a.nid);
636 let b = aliases.alias(&b.nid);
637 a.cmp(&b)
638 }
639 SortBy::Status => match (&a.sync, &b.sync) {
640 (Some(_), None) => Ordering::Less,
641 (None, Some(_)) => Ordering::Greater,
642 (Some(a), Some(b)) => a.cmp(b).reverse(),
643 (None, None) => Ordering::Equal,
644 },
645 };
646
647 seeds.sort_by(|a, b| {
649 if a.nid == local {
650 Ordering::Less
651 } else if b.nid == local {
652 Ordering::Greater
653 } else {
654 compare(a, b)
655 }
656 });
657}
658
659struct FetcherSpinner {
660 preferred_seeds: usize,
661 replicas: sync::ReplicationFactor,
662 spinner: term::Spinner,
663}
664
665impl FetcherSpinner {
666 fn new(target: &sync::fetch::Target, progress: &sync::fetch::Progress) -> Self {
667 let preferred_seeds = target.preferred_seeds().len();
668 let replicas = target.replicas();
669 let spinner = term::spinner(format!(
670 "{} of {} preferred seeds, and {} of at least {} total seeds.",
671 term::format::secondary(progress.preferred()),
672 term::format::secondary(preferred_seeds),
673 term::format::secondary(progress.succeeded()),
674 term::format::secondary(replicas.lower_bound())
675 ));
676 Self {
677 preferred_seeds: target.preferred_seeds().len(),
678 replicas: *target.replicas(),
679 spinner,
680 }
681 }
682
683 fn emit_progress(&mut self, progress: &sync::fetch::Progress) {
684 self.spinner.message(format!(
685 "{} of {} preferred seeds, and {} of at least {} total seeds.",
686 term::format::secondary(progress.preferred()),
687 term::format::secondary(self.preferred_seeds),
688 term::format::secondary(progress.succeeded()),
689 term::format::secondary(self.replicas.lower_bound()),
690 ))
691 }
692
693 fn emit_fetching(
694 &mut self,
695 node: &NodeId,
696 addr: &node::Address,
697 progress: &sync::fetch::Progress,
698 ) {
699 self.spinner.message(format!(
700 "{} of {} preferred seeds, and {} of at least {} total seeds… [fetch {}@{}]",
701 term::format::secondary(progress.preferred()),
702 term::format::secondary(self.preferred_seeds),
703 term::format::secondary(progress.succeeded()),
704 term::format::secondary(self.replicas.lower_bound()),
705 term::format::tertiary(term::format::node_id_human_compact(node)),
706 term::format::tertiary(term::format::addr_compact(addr)),
707 ))
708 }
709
710 fn emit_dialing(
711 &mut self,
712 node: &NodeId,
713 addr: &node::Address,
714 progress: &sync::fetch::Progress,
715 ) {
716 self.spinner.message(format!(
717 "{} of {} preferred seeds, and {} of at least {} total seeds… [dial {}@{}]",
718 term::format::secondary(progress.preferred()),
719 term::format::secondary(self.preferred_seeds),
720 term::format::secondary(progress.succeeded()),
721 term::format::secondary(self.replicas.lower_bound()),
722 term::format::tertiary(term::format::node_id_human_compact(node)),
723 term::format::tertiary(term::format::addr_compact(addr)),
724 ))
725 }
726
727 fn finished(mut self, outcome: &SuccessfulOutcome) {
728 match outcome {
729 SuccessfulOutcome::PreferredNodes { preferred } => {
730 self.spinner.message(format!(
731 "Target met: {} preferred seed(s).",
732 term::format::positive(preferred),
733 ));
734 }
735 SuccessfulOutcome::MinReplicas { succeeded, .. } => {
736 self.spinner.message(format!(
737 "Target met: {} seed(s)",
738 term::format::positive(succeeded)
739 ));
740 }
741 SuccessfulOutcome::MaxReplicas {
742 succeeded,
743 min,
744 max,
745 } => {
746 self.spinner.message(format!(
747 "Target met: {} of {} min and {} max seed(s)",
748 succeeded,
749 term::format::secondary(min),
750 term::format::secondary(max)
751 ));
752 }
753 }
754 self.spinner.finish()
755 }
756
757 fn failed(mut self, missed: &sync::fetch::TargetMissed) {
758 let mut message = "Target not met: ".to_string();
759 let missing_preferred_seeds = missed
760 .missed_nodes()
761 .iter()
762 .map(|nid| term::format::node_id_human(nid).to_string())
763 .collect::<Vec<_>>();
764 let required = missed.required_nodes();
765 if !missing_preferred_seeds.is_empty() {
766 message.push_str(&format!(
767 "could not fetch from [{}], and required {} more seed(s)",
768 missing_preferred_seeds.join(", "),
769 required
770 ));
771 } else {
772 message.push_str(&format!("required {required} more seed(s)"));
773 }
774 self.spinner.message(message);
775 self.spinner.failed();
776 }
777}
778
779fn display_fetch_result(result: &sync::FetcherResult, verbose: bool) {
780 match result {
781 sync::FetcherResult::TargetReached(success) => {
782 let progress = success.progress();
783 let results = success.fetch_results();
784 display_success(results.success(), verbose);
785 let failed = progress.failed();
786 if failed > 0 && verbose {
787 term::warning(format!("Failed to fetch from {failed} seed(s)."));
788 for (node, reason) in results.failed() {
789 term::warning(format!(
790 "{}: {}",
791 term::format::node_id_human(node),
792 term::format::yellow(reason),
793 ))
794 }
795 }
796 }
797 sync::FetcherResult::TargetError(failed) => {
798 let results = failed.fetch_results();
799 let progress = failed.progress();
800 let target = failed.target();
801 let succeeded = progress.succeeded();
802 let missed = failed.missed_nodes();
803 term::error(format!(
804 "Fetched from {} preferred seed(s), could not reach {} seed(s)",
805 succeeded,
806 target.replicas().lower_bound(),
807 ));
808 term::error(format!(
809 "Could not replicate from {} preferred seed(s)",
810 missed.len()
811 ));
812 for (node, reason) in results.failed() {
813 term::error(format!(
814 "{}: {}",
815 term::format::node_id_human(node),
816 term::format::negative(reason),
817 ))
818 }
819 if succeeded > 0 {
820 term::info!("Successfully fetched from the following seeds:");
821 display_success(results.success(), verbose)
822 }
823 }
824 }
825}
826
827fn display_success<'a>(
828 results: impl Iterator<Item = (&'a NodeId, &'a [RefUpdate], HashSet<NodeId>)>,
829 verbose: bool,
830) {
831 for (node, updates, _) in results {
832 term::println(
833 "🌱 Fetched from",
834 term::format::secondary(term::format::node_id_human(node)),
835 );
836 if verbose {
837 let mut updates = updates
838 .iter()
839 .filter(|up| !matches!(up, RefUpdate::Skipped { .. }))
840 .peekable();
841 if updates.peek().is_none() {
842 term::indented(term::format::italic("no references were updated"));
843 } else {
844 for update in updates {
845 term::indented(term::format::ref_update_verbose(update))
846 }
847 }
848 }
849 }
850}
851
852fn print_announcer_result(result: &sync::AnnouncerResult, verbose: bool) {
853 use sync::announce::SuccessfulOutcome::*;
854 match result {
855 sync::AnnouncerResult::Success(success) if verbose => {
856 match success.outcome() {
859 MinReplicationFactor { preferred, synced }
860 | MaxReplicationFactor { preferred, synced }
861 | PreferredNodes {
862 preferred,
863 total_nodes_synced: synced,
864 } => {
865 if preferred == 0 {
866 term::success!("Synced {} seed(s)", term::format::positive(synced));
867 } else {
868 term::success!(
869 "Synced {} preferred seed(s) and {} total seed(s)",
870 term::format::positive(preferred),
871 term::format::positive(synced)
872 );
873 }
874 }
875 }
876 print_synced(success.synced());
877 }
878 sync::AnnouncerResult::Success(_) => {
879 }
881 sync::AnnouncerResult::TimedOut(result) => {
882 if result.synced().is_empty() {
883 term::error("All seeds timed out, use `rad sync -v` to see the list of seeds");
884 return;
885 }
886 let timed_out = result.timed_out();
887 term::warning(format!(
888 "{} seed(s) timed out, use `rad sync -v` to see the list of seeds",
889 timed_out.len(),
890 ));
891 if verbose {
892 print_synced(result.synced());
893 for node in timed_out {
894 term::warning(format!("{} timed out", term::format::node_id_human(node)));
895 }
896 }
897 }
898 sync::AnnouncerResult::NoNodes(result) => {
899 term::info!("Announcement could not sync with anymore seeds.");
900 if verbose {
901 print_synced(result.synced())
902 }
903 }
904 }
905}
906
907fn print_synced(synced: &BTreeMap<NodeId, sync::announce::SyncStatus>) {
908 for (node, status) in synced.iter() {
909 let mut message = format!("🌱 Synced with {}", term::format::node_id_human(node));
910
911 match status {
912 sync::announce::SyncStatus::AlreadySynced => {
913 message.push_str(&format!("{}", term::format::dim(" (already in sync)")));
914 }
915 sync::announce::SyncStatus::Synced { duration } => {
916 message.push_str(&format!(
917 "{}",
918 term::format::dim(format!(" in {}s", duration.as_secs()))
919 ));
920 }
921 }
922 term::info!("{}", message);
923 }
924}