1mod args;
2
3use std::cmp::Ordering;
4use std::collections::BTreeMap;
5use std::collections::HashSet;
6use std::time;
7
8use anyhow::{anyhow, Context as _};
9
10use radicle::node;
11use radicle::node::address::Store;
12use radicle::node::sync;
13use radicle::node::sync::fetch::SuccessfulOutcome;
14use radicle::node::SyncedAt;
15use radicle::node::{AliasStore, Handle as _, Node, Seed, SyncStatus};
16use radicle::prelude::{NodeId, Profile, RepoId};
17use radicle::storage::ReadRepository;
18use radicle::storage::RefUpdate;
19use radicle::storage::{ReadStorage, RemoteRepository};
20use radicle_term::Element;
21
22use crate::node::SyncReporting;
23use crate::node::SyncSettings;
24use crate::terminal as term;
25use crate::terminal::format::Author;
26use crate::terminal::{Table, TableOptions};
27
28pub use args::Args;
29use args::{Command, SortBy, SyncDirection, SyncMode};
30
31pub fn run(args: Args, ctx: impl term::Context) -> anyhow::Result<()> {
32 let profile = ctx.profile()?;
33 let mut node = radicle::Node::new(profile.socket());
34 if !node.is_running() {
35 anyhow::bail!(
36 "to sync a repository, your node must be running. To start it, run `rad node start`"
37 );
38 }
39 let verbose = args.verbose;
40 let debug = args.verbose;
41
42 match args.command {
43 Some(Command::Status { rid, sort_by }) => {
44 let rid = match rid {
45 Some(rid) => rid,
46 None => {
47 let (_, rid) = radicle::rad::cwd()
48 .context("Current directory is not a Radicle repository")?;
49 rid
50 }
51 };
52 sync_status(rid, &mut node, &profile, &sort_by, verbose)?;
53 }
54 None => match SyncMode::from(args.sync) {
55 SyncMode::Repo {
56 rid,
57 settings,
58 direction,
59 } => {
60 let rid = match rid {
61 Some(rid) => rid,
62 None => {
63 let (_, rid) = radicle::rad::cwd()
64 .context("Current directory is not a Radicle repository")?;
65 rid
66 }
67 };
68 let settings = settings.clone().with_profile(&profile);
69
70 if matches!(direction, SyncDirection::Fetch | SyncDirection::Both) {
71 if !profile.policies()?.is_seeding(&rid)? {
72 anyhow::bail!("repository {rid} is not seeded");
73 }
74 let result = fetch(rid, settings.clone(), &mut node, &profile)?;
75 display_fetch_result(&result, verbose)
76 }
77 if matches!(direction, SyncDirection::Announce | SyncDirection::Both) {
78 announce_refs(rid, settings, &mut node, &profile, verbose, debug)?;
79 }
80 }
81 SyncMode::Inventory => {
82 announce_inventory(node)?;
83 }
84 },
85 }
86
87 Ok(())
88}
89
90fn sync_status(
91 rid: RepoId,
92 node: &mut Node,
93 profile: &Profile,
94 sort_by: &SortBy,
95 verbose: bool,
96) -> anyhow::Result<()> {
97 const SYMBOL_STATE: &str = "?";
98 const SYMBOL_STATE_UNKNOWN: &str = "•";
99
100 let mut table = Table::<5, term::Label>::new(TableOptions::bordered());
101 let mut seeds: Vec<_> = node.seeds_for(rid, [*profile.did()])?.into();
102 let local_nid = node.nid()?;
103 let aliases = profile.aliases();
104
105 table.header([
106 term::format::bold("Node ID").into(),
107 term::format::bold("Alias").into(),
108 term::format::bold(SYMBOL_STATE).into(),
109 term::format::bold("SigRefs").into(),
110 term::format::bold("Timestamp").into(),
111 ]);
112 table.divider();
113
114 sort_seeds_by(local_nid, &mut seeds, &aliases, sort_by);
115
116 let seeds = seeds.into_iter().flat_map(|seed| {
117 let (status, head, time) = match seed.sync {
118 Some(SyncStatus::Synced {
119 at: SyncedAt { oid, timestamp },
120 }) => (
121 term::PREFIX_SUCCESS,
122 term::format::oid(oid),
123 term::format::timestamp(timestamp),
124 ),
125 Some(SyncStatus::OutOfSync {
126 remote: SyncedAt { timestamp, .. },
127 local,
128 ..
129 }) if seed.nid == local_nid => (
130 term::PREFIX_WARNING,
131 term::format::oid(local.oid),
132 term::format::timestamp(timestamp),
133 ),
134 Some(SyncStatus::OutOfSync {
135 remote: SyncedAt { oid, timestamp },
136 ..
137 }) => (
138 term::PREFIX_ERROR,
139 term::format::oid(oid),
140 term::format::timestamp(timestamp),
141 ),
142 None if verbose => (
143 term::format::dim(SYMBOL_STATE_UNKNOWN),
144 term::paint(String::new()),
145 term::paint(String::new()),
146 ),
147 None => return None,
148 };
149
150 let (alias, nid) = Author::new(&seed.nid, profile, verbose).labels();
151
152 Some([
153 nid,
154 alias,
155 status.into(),
156 term::format::secondary(head).into(),
157 time.dim().italic().into(),
158 ])
159 });
160
161 table.extend(seeds);
162 table.print();
163
164 if profile.hints() {
165 const COLUMN_WIDTH: usize = 16;
166 let status = format!(
167 "\n{:>4} … {}\n {} {}\n {} {}",
168 term::Paint::from(SYMBOL_STATE.to_string()).fg(radicle_term::Color::White),
169 term::format::dim("Status:"),
170 format_args!(
171 "{} {:width$}",
172 term::PREFIX_SUCCESS,
173 term::format::dim("… in sync"),
174 width = COLUMN_WIDTH,
175 ),
176 format_args!(
177 "{} {}",
178 term::PREFIX_ERROR,
179 term::format::dim("… out of sync")
180 ),
181 format_args!(
182 "{} {:width$}",
183 term::PREFIX_WARNING,
184 term::format::dim("… not announced"),
185 width = COLUMN_WIDTH,
186 ),
187 format_args!(
188 "{} {}",
189 term::format::dim(SYMBOL_STATE_UNKNOWN),
190 term::format::dim("… unknown")
191 ),
192 );
193 term::hint(status);
194 }
195
196 Ok(())
197}
198
199fn announce_refs(
200 rid: RepoId,
201 settings: SyncSettings,
202 node: &mut Node,
203 profile: &Profile,
204 verbose: bool,
205 debug: bool,
206) -> anyhow::Result<()> {
207 let Ok(repo) = profile.storage.repository(rid) else {
208 return Err(anyhow!(
209 "nothing to announce, repository {rid} is not available locally"
210 ));
211 };
212 if let Err(e) = repo.remote(&profile.public_key) {
213 if e.is_not_found() {
214 term::print(term::format::italic(
215 "Nothing to announce, you don't have a fork of this repository.",
216 ));
217 return Ok(());
218 } else {
219 return Err(anyhow!("failed to load local fork of {rid}: {e}"));
220 }
221 }
222
223 let result = crate::node::announce(
224 &repo,
225 settings,
226 SyncReporting {
227 debug,
228 ..SyncReporting::default()
229 },
230 node,
231 profile,
232 )?;
233 if let Some(result) = result {
234 print_announcer_result(&result, verbose)
235 }
236
237 Ok(())
238}
239
240pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
241 let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count();
242 let spinner = term::spinner(format!("Announcing inventory to {peers} peers.."));
243
244 node.announce_inventory()?;
245 spinner.finish();
246
247 Ok(())
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum FetchError {
252 #[error(transparent)]
253 Node(#[from] node::Error),
254 #[error(transparent)]
255 Db(#[from] node::db::Error),
256 #[error(transparent)]
257 Address(#[from] node::address::Error),
258 #[error(transparent)]
259 Fetcher(#[from] sync::FetcherError),
260}
261
262pub fn fetch(
263 rid: RepoId,
264 settings: SyncSettings,
265 node: &mut Node,
266 profile: &Profile,
267) -> Result<sync::FetcherResult, FetchError> {
268 let db = profile.database()?;
269 let local = profile.id();
270 let is_private = profile.storage.repository(rid).ok().and_then(|repo| {
271 let doc = repo.identity_doc().ok()?.doc;
272 sync::PrivateNetwork::private_repo(&doc)
273 });
274 let config = match is_private {
275 Some(private) => sync::FetcherConfig::private(private, settings.replicas, *local),
276 None => {
277 let seeds = node.seeds_for(rid, [*profile.did()])?;
280 let (connected, disconnected) = seeds.partition();
281 let candidates = connected
282 .into_iter()
283 .map(|seed| seed.nid)
284 .chain(disconnected.into_iter().filter_map(|seed| {
285 (!seed.addrs.is_empty()).then_some(seed.nid)
287 }))
288 .map(sync::fetch::Candidate::new);
289 sync::FetcherConfig::public(settings.seeds.clone(), settings.replicas, *local)
290 .with_candidates(candidates)
291 }
292 };
293 let mut fetcher = sync::Fetcher::new(config)?;
294
295 let mut progress = fetcher.progress();
296 term::info!(
297 "Fetching {} from the network, found {} potential seed(s).",
298 term::format::tertiary(rid),
299 term::format::tertiary(progress.candidate())
300 );
301 let mut spinner = FetcherSpinner::new(fetcher.target(), &progress);
302
303 while let Some(nid) = fetcher.next_node() {
304 match node.session(nid)? {
305 Some(session) if session.is_connected() => fetcher.ready_to_fetch(nid, session.addr),
306 _ => {
307 let addrs = db.addresses_of(&nid)?;
308 if addrs.is_empty() {
309 fetcher.fetch_failed(nid, "Could not connect. No addresses known.");
310 } else if let Some(addr) = connect(
311 nid,
312 addrs.into_iter().map(|ka| ka.addr),
313 settings.timeout,
314 node,
315 &mut spinner,
316 &fetcher.progress(),
317 ) {
318 fetcher.ready_to_fetch(nid, addr)
319 } else {
320 fetcher
321 .fetch_failed(nid, "Could not connect. At least one address is known but all attempts timed out.");
322 }
323 }
324 }
325 if let Some((nid, addr)) = fetcher.next_fetch() {
326 spinner.emit_fetching(&nid, &addr, &progress);
327 let result = node.fetch(
328 rid,
329 nid,
330 settings.timeout,
331 settings.signed_references_minimum_feature_level,
332 )?;
333 match fetcher.fetch_complete(nid, result) {
334 std::ops::ControlFlow::Continue(update) => {
335 spinner.emit_progress(&update);
336 progress = update
337 }
338 std::ops::ControlFlow::Break(success) => {
339 spinner.finished(success.outcome());
340 return Ok(sync::FetcherResult::TargetReached(success));
341 }
342 }
343 }
344 }
345 let result = fetcher.finish();
346 match &result {
347 sync::FetcherResult::TargetReached(success) => {
348 spinner.finished(success.outcome());
349 }
350 sync::FetcherResult::TargetError(missed) => spinner.failed(missed),
351 }
352 Ok(result)
353}
354
355fn connect(
359 nid: NodeId,
360 addrs: impl Iterator<Item = node::Address>,
361 timeout: time::Duration,
362 node: &mut Node,
363 spinner: &mut FetcherSpinner,
364 progress: &sync::fetch::Progress,
365) -> Option<node::Address> {
366 for addr in addrs {
367 spinner.emit_dialing(&nid, &addr, progress);
368 let cr = node.connect(
369 nid,
370 addr.clone(),
371 node::ConnectOptions {
372 persistent: false,
373 timeout,
374 },
375 );
376
377 match cr {
378 Ok(node::ConnectResult::Connected) => {
379 return Some(addr);
380 }
381 Ok(node::ConnectResult::Disconnected { .. }) => {
382 continue;
383 }
384 Err(e) => {
385 log::warn!(target: "cli", "Failed to connect to {nid}@{addr}: {e}");
386 continue;
387 }
388 }
389 }
390 None
391}
392
393fn sort_seeds_by(local: NodeId, seeds: &mut [Seed], aliases: &impl AliasStore, sort_by: &SortBy) {
394 let compare = |a: &Seed, b: &Seed| match sort_by {
395 SortBy::Nid => a.nid.cmp(&b.nid),
396 SortBy::Alias => {
397 let a = aliases.alias(&a.nid);
398 let b = aliases.alias(&b.nid);
399 a.cmp(&b)
400 }
401 SortBy::Status => match (&a.sync, &b.sync) {
402 (Some(_), None) => Ordering::Less,
403 (None, Some(_)) => Ordering::Greater,
404 (Some(a), Some(b)) => a.cmp(b).reverse(),
405 (None, None) => Ordering::Equal,
406 },
407 };
408
409 seeds.sort_by(|a, b| {
411 if a.nid == local {
412 Ordering::Less
413 } else if b.nid == local {
414 Ordering::Greater
415 } else {
416 compare(a, b)
417 }
418 });
419}
420
421struct FetcherSpinner {
422 preferred_seeds: usize,
423 replicas: sync::ReplicationFactor,
424 spinner: term::Spinner,
425}
426
427impl FetcherSpinner {
428 fn new(target: &sync::fetch::Target, progress: &sync::fetch::Progress) -> Self {
429 let preferred_seeds = target.preferred_seeds().len();
430 let replicas = target.replicas();
431 let spinner = term::spinner(format!(
432 "{} of {} preferred seeds, and {} of at least {} total seeds.",
433 term::format::secondary(progress.preferred()),
434 term::format::secondary(preferred_seeds),
435 term::format::secondary(progress.succeeded()),
436 term::format::secondary(replicas.lower_bound())
437 ));
438 Self {
439 preferred_seeds: target.preferred_seeds().len(),
440 replicas: *target.replicas(),
441 spinner,
442 }
443 }
444
445 fn emit_progress(&mut self, progress: &sync::fetch::Progress) {
446 self.spinner.message(format!(
447 "{} of {} preferred seeds, and {} of at least {} total seeds.",
448 term::format::secondary(progress.preferred()),
449 term::format::secondary(self.preferred_seeds),
450 term::format::secondary(progress.succeeded()),
451 term::format::secondary(self.replicas.lower_bound()),
452 ))
453 }
454
455 fn emit_fetching(
456 &mut self,
457 node: &NodeId,
458 addr: &node::Address,
459 progress: &sync::fetch::Progress,
460 ) {
461 self.spinner.message(format!(
462 "{} of {} preferred seeds, and {} of at least {} total seeds… [fetch {}@{}]",
463 term::format::secondary(progress.preferred()),
464 term::format::secondary(self.preferred_seeds),
465 term::format::secondary(progress.succeeded()),
466 term::format::secondary(self.replicas.lower_bound()),
467 term::format::tertiary(term::format::node_id_human_compact(node)),
468 term::format::tertiary(addr.display_compact()),
469 ))
470 }
471
472 fn emit_dialing(
473 &mut self,
474 node: &NodeId,
475 addr: &node::Address,
476 progress: &sync::fetch::Progress,
477 ) {
478 self.spinner.message(format!(
479 "{} of {} preferred seeds, and {} of at least {} total seeds… [dial {}@{}]",
480 term::format::secondary(progress.preferred()),
481 term::format::secondary(self.preferred_seeds),
482 term::format::secondary(progress.succeeded()),
483 term::format::secondary(self.replicas.lower_bound()),
484 term::format::tertiary(term::format::node_id_human_compact(node)),
485 term::format::tertiary(addr.display_compact()),
486 ))
487 }
488
489 fn finished(mut self, outcome: &SuccessfulOutcome) {
490 match outcome {
491 SuccessfulOutcome::PreferredNodes { preferred } => {
492 self.spinner.message(format!(
493 "Target met: {} preferred seed(s).",
494 term::format::positive(preferred),
495 ));
496 }
497 SuccessfulOutcome::MinReplicas { succeeded, .. } => {
498 self.spinner.message(format!(
499 "Target met: {} seed(s)",
500 term::format::positive(succeeded)
501 ));
502 }
503 SuccessfulOutcome::MaxReplicas {
504 succeeded,
505 min,
506 max,
507 } => {
508 self.spinner.message(format!(
509 "Target met: {} of {} min and {} max seed(s)",
510 succeeded,
511 term::format::secondary(min),
512 term::format::secondary(max)
513 ));
514 }
515 }
516 self.spinner.finish()
517 }
518
519 fn failed(mut self, missed: &sync::fetch::TargetMissed) {
520 let mut message = "Target not met: ".to_string();
521 let missing_preferred_seeds = missed
522 .missed_nodes()
523 .iter()
524 .map(|nid| term::format::node_id_human(nid).to_string())
525 .collect::<Vec<_>>();
526 let required = missed.required_nodes();
527 if !missing_preferred_seeds.is_empty() {
528 message.push_str(&format!(
529 "could not fetch from [{}], and required {} more seed(s)",
530 missing_preferred_seeds.join(", "),
531 required
532 ));
533 } else {
534 message.push_str(&format!("required {required} more seed(s)"));
535 }
536 self.spinner.message(message);
537 self.spinner.failed();
538 }
539}
540
541fn display_fetch_result(result: &sync::FetcherResult, verbose: bool) {
542 match result {
543 sync::FetcherResult::TargetReached(success) => {
544 let progress = success.progress();
545 let results = success.fetch_results();
546 display_success(results.success(), verbose);
547 let failed = progress.failed();
548 if failed > 0 && verbose {
549 term::warning(format!("Failed to fetch from {failed} seed(s)."));
550 for (node, reason) in results.failed() {
551 term::warning(format!(
552 "{}: {}",
553 term::format::node_id_human(node),
554 term::format::yellow(reason),
555 ))
556 }
557 }
558 }
559 sync::FetcherResult::TargetError(failed) => {
560 let results = failed.fetch_results();
561 let progress = failed.progress();
562 let target = failed.target();
563 let succeeded = progress.succeeded();
564 let missed = failed.missed_nodes();
565 term::error(format!(
566 "Fetched from {} preferred seed(s), could not reach {} seed(s)",
567 succeeded,
568 target.replicas().lower_bound(),
569 ));
570 term::error(format!(
571 "Could not replicate from {} preferred seed(s)",
572 missed.len()
573 ));
574 for (node, reason) in results.failed() {
575 term::error(format!(
576 "{}: {}",
577 term::format::node_id_human(node),
578 term::format::negative(reason),
579 ))
580 }
581 if succeeded > 0 {
582 term::info!("Successfully fetched from the following seeds:");
583 display_success(results.success(), verbose)
584 }
585 }
586 }
587}
588
589fn display_success<'a>(
590 results: impl Iterator<Item = (&'a NodeId, &'a [RefUpdate], HashSet<NodeId>)>,
591 verbose: bool,
592) {
593 for (node, updates, _) in results {
594 term::println(
595 "🌱 Fetched from",
596 term::format::secondary(term::format::node_id_human(node)),
597 );
598 if verbose {
599 let mut updates = updates
600 .iter()
601 .filter(|up| !matches!(up, RefUpdate::Skipped { .. }))
602 .peekable();
603 if updates.peek().is_none() {
604 term::indented(term::format::italic("no references were updated"));
605 } else {
606 for update in updates {
607 term::indented(term::format::ref_update_verbose(update))
608 }
609 }
610 }
611 }
612}
613
614fn print_announcer_result(result: &sync::AnnouncerResult, verbose: bool) {
615 use sync::announce::SuccessfulOutcome::*;
616 match result {
617 sync::AnnouncerResult::Success(success) if verbose => {
618 match success.outcome() {
621 MinReplicationFactor { preferred, synced }
622 | MaxReplicationFactor { preferred, synced }
623 | PreferredNodes {
624 preferred,
625 total_nodes_synced: synced,
626 } => {
627 if preferred == 0 {
628 term::success!("Synced {} seed(s)", term::format::positive(synced));
629 } else {
630 term::success!(
631 "Synced {} preferred seed(s) and {} total seed(s)",
632 term::format::positive(preferred),
633 term::format::positive(synced)
634 );
635 }
636 }
637 }
638 print_synced(success.synced());
639 }
640 sync::AnnouncerResult::Success(_) => {
641 }
643 sync::AnnouncerResult::TimedOut(result) => {
644 if result.synced().is_empty() {
645 term::error("All seeds timed out, use `rad sync -v` to see the list of seeds");
646 return;
647 }
648 let timed_out = result.timed_out();
649 term::warning(format!(
650 "{} seed(s) timed out, use `rad sync -v` to see the list of seeds",
651 timed_out.len(),
652 ));
653 if verbose {
654 print_synced(result.synced());
655 for node in timed_out {
656 term::warning(format!("{} timed out", term::format::node_id_human(node)));
657 }
658 }
659 }
660 sync::AnnouncerResult::NoNodes(result) => {
661 term::info!("Announcement could not sync with anymore seeds.");
662 if verbose {
663 print_synced(result.synced())
664 }
665 }
666 }
667}
668
669fn print_synced(synced: &BTreeMap<NodeId, sync::announce::SyncStatus>) {
670 for (node, status) in synced.iter() {
671 let mut message = format!("🌱 Synced with {}", term::format::node_id_human(node));
672
673 match status {
674 sync::announce::SyncStatus::AlreadySynced => {
675 message.push_str(&format!("{}", term::format::dim(" (already in sync)")));
676 }
677 sync::announce::SyncStatus::Synced { duration } => {
678 message.push_str(&format!(
679 "{}",
680 term::format::dim(format!(" in {}s", duration.as_secs()))
681 ));
682 }
683 }
684 term::info!("{}", message);
685 }
686}