Skip to main content

radicle_cli/commands/
sync.rs

1mod args;
2
3use std::cmp::Ordering;
4use std::collections::BTreeMap;
5use std::collections::HashSet;
6use std::time;
7
8use anyhow::{anyhow, Context as _};
9
10use radicle::node;
11use radicle::node::address::Store;
12use radicle::node::sync;
13use radicle::node::sync::fetch::SuccessfulOutcome;
14use radicle::node::SyncedAt;
15use radicle::node::{AliasStore, Handle as _, Node, Seed, SyncStatus};
16use radicle::prelude::{NodeId, Profile, RepoId};
17use radicle::storage::ReadRepository;
18use radicle::storage::RefUpdate;
19use radicle::storage::{ReadStorage, RemoteRepository};
20use radicle_term::Element;
21
22use crate::node::SyncReporting;
23use crate::node::SyncSettings;
24use crate::terminal as term;
25use crate::terminal::format::Author;
26use crate::terminal::{Table, TableOptions};
27
28pub use args::Args;
29use args::{Command, SortBy, SyncDirection, SyncMode};
30
31pub fn run(args: Args, ctx: impl term::Context) -> anyhow::Result<()> {
32    let profile = ctx.profile()?;
33    let mut node = radicle::Node::new(profile.socket());
34    if !node.is_running() {
35        anyhow::bail!(
36            "to sync a repository, your node must be running. To start it, run `rad node start`"
37        );
38    }
39    let verbose = args.verbose;
40    let debug = args.verbose;
41
42    match args.command {
43        Some(Command::Status { rid, sort_by }) => {
44            let rid = match rid {
45                Some(rid) => rid,
46                None => {
47                    let (_, rid) = radicle::rad::cwd()
48                        .context("Current directory is not a Radicle repository")?;
49                    rid
50                }
51            };
52            sync_status(rid, &mut node, &profile, &sort_by, verbose)?;
53        }
54        None => match SyncMode::from(args.sync) {
55            SyncMode::Repo {
56                rid,
57                settings,
58                direction,
59            } => {
60                let rid = match rid {
61                    Some(rid) => rid,
62                    None => {
63                        let (_, rid) = radicle::rad::cwd()
64                            .context("Current directory is not a Radicle repository")?;
65                        rid
66                    }
67                };
68                let settings = settings.clone().with_profile(&profile);
69
70                if matches!(direction, SyncDirection::Fetch | SyncDirection::Both) {
71                    if !profile.policies()?.is_seeding(&rid)? {
72                        anyhow::bail!("repository {rid} is not seeded");
73                    }
74                    let result = fetch(rid, settings.clone(), &mut node, &profile)?;
75                    display_fetch_result(&result, verbose)
76                }
77                if matches!(direction, SyncDirection::Announce | SyncDirection::Both) {
78                    announce_refs(rid, settings, &mut node, &profile, verbose, debug)?;
79                }
80            }
81            SyncMode::Inventory => {
82                announce_inventory(node)?;
83            }
84        },
85    }
86
87    Ok(())
88}
89
90fn sync_status(
91    rid: RepoId,
92    node: &mut Node,
93    profile: &Profile,
94    sort_by: &SortBy,
95    verbose: bool,
96) -> anyhow::Result<()> {
97    const SYMBOL_STATE: &str = "?";
98    const SYMBOL_STATE_UNKNOWN: &str = "•";
99
100    let mut table = Table::<5, term::Label>::new(TableOptions::bordered());
101    let mut seeds: Vec<_> = node.seeds_for(rid, [*profile.did()])?.into();
102    let local_nid = node.nid()?;
103    let aliases = profile.aliases();
104
105    table.header([
106        term::format::bold("Node ID").into(),
107        term::format::bold("Alias").into(),
108        term::format::bold(SYMBOL_STATE).into(),
109        term::format::bold("SigRefs").into(),
110        term::format::bold("Timestamp").into(),
111    ]);
112    table.divider();
113
114    sort_seeds_by(local_nid, &mut seeds, &aliases, sort_by);
115
116    let seeds = seeds.into_iter().flat_map(|seed| {
117        let (status, head, time) = match seed.sync {
118            Some(SyncStatus::Synced {
119                at: SyncedAt { oid, timestamp },
120            }) => (
121                term::PREFIX_SUCCESS,
122                term::format::oid(oid),
123                term::format::timestamp(timestamp),
124            ),
125            Some(SyncStatus::OutOfSync {
126                remote: SyncedAt { timestamp, .. },
127                local,
128                ..
129            }) if seed.nid == local_nid => (
130                term::PREFIX_WARNING,
131                term::format::oid(local.oid),
132                term::format::timestamp(timestamp),
133            ),
134            Some(SyncStatus::OutOfSync {
135                remote: SyncedAt { oid, timestamp },
136                ..
137            }) => (
138                term::PREFIX_ERROR,
139                term::format::oid(oid),
140                term::format::timestamp(timestamp),
141            ),
142            None if verbose => (
143                term::format::dim(SYMBOL_STATE_UNKNOWN),
144                term::paint(String::new()),
145                term::paint(String::new()),
146            ),
147            None => return None,
148        };
149
150        let (alias, nid) = Author::new(&seed.nid, profile, verbose).labels();
151
152        Some([
153            nid,
154            alias,
155            status.into(),
156            term::format::secondary(head).into(),
157            time.dim().italic().into(),
158        ])
159    });
160
161    table.extend(seeds);
162    table.print();
163
164    if profile.hints() {
165        const COLUMN_WIDTH: usize = 16;
166        let status = format!(
167            "\n{:>4} … {}\n       {}   {}\n       {}   {}",
168            term::Paint::from(SYMBOL_STATE.to_string()).fg(radicle_term::Color::White),
169            term::format::dim("Status:"),
170            format_args!(
171                "{} {:width$}",
172                term::PREFIX_SUCCESS,
173                term::format::dim("… in sync"),
174                width = COLUMN_WIDTH,
175            ),
176            format_args!(
177                "{} {}",
178                term::PREFIX_ERROR,
179                term::format::dim("… out of sync")
180            ),
181            format_args!(
182                "{} {:width$}",
183                term::PREFIX_WARNING,
184                term::format::dim("… not announced"),
185                width = COLUMN_WIDTH,
186            ),
187            format_args!(
188                "{} {}",
189                term::format::dim(SYMBOL_STATE_UNKNOWN),
190                term::format::dim("… unknown")
191            ),
192        );
193        term::hint(status);
194    }
195
196    Ok(())
197}
198
199fn announce_refs(
200    rid: RepoId,
201    settings: SyncSettings,
202    node: &mut Node,
203    profile: &Profile,
204    verbose: bool,
205    debug: bool,
206) -> anyhow::Result<()> {
207    let Ok(repo) = profile.storage.repository(rid) else {
208        return Err(anyhow!(
209            "nothing to announce, repository {rid} is not available locally"
210        ));
211    };
212    if let Err(e) = repo.remote(&profile.public_key) {
213        if e.is_not_found() {
214            term::print(term::format::italic(
215                "Nothing to announce, you don't have a fork of this repository.",
216            ));
217            return Ok(());
218        } else {
219            return Err(anyhow!("failed to load local fork of {rid}: {e}"));
220        }
221    }
222
223    let result = crate::node::announce(
224        &repo,
225        settings,
226        SyncReporting {
227            debug,
228            ..SyncReporting::default()
229        },
230        node,
231        profile,
232    )?;
233    if let Some(result) = result {
234        print_announcer_result(&result, verbose)
235    }
236
237    Ok(())
238}
239
240pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
241    let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count();
242    let spinner = term::spinner(format!("Announcing inventory to {peers} peers.."));
243
244    node.announce_inventory()?;
245    spinner.finish();
246
247    Ok(())
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum FetchError {
252    #[error(transparent)]
253    Node(#[from] node::Error),
254    #[error(transparent)]
255    Db(#[from] node::db::Error),
256    #[error(transparent)]
257    Address(#[from] node::address::Error),
258    #[error(transparent)]
259    Fetcher(#[from] sync::FetcherError),
260}
261
262pub fn fetch(
263    rid: RepoId,
264    settings: SyncSettings,
265    node: &mut Node,
266    profile: &Profile,
267) -> Result<sync::FetcherResult, FetchError> {
268    let db = profile.database()?;
269    let local = profile.id();
270    let is_private = profile.storage.repository(rid).ok().and_then(|repo| {
271        let doc = repo.identity_doc().ok()?.doc;
272        sync::PrivateNetwork::private_repo(&doc)
273    });
274    let config = match is_private {
275        Some(private) => sync::FetcherConfig::private(private, settings.replicas, *local),
276        None => {
277            // We push nodes that are in our seed list in attempt to fulfill the
278            // replicas, if needed.
279            let seeds = node.seeds_for(rid, [*profile.did()])?;
280            let (connected, disconnected) = seeds.partition();
281            let candidates = connected
282                .into_iter()
283                .map(|seed| seed.nid)
284                .chain(disconnected.into_iter().filter_map(|seed| {
285                    // Only consider seeds that have at least one known address.
286                    (!seed.addrs.is_empty()).then_some(seed.nid)
287                }))
288                .map(sync::fetch::Candidate::new);
289            sync::FetcherConfig::public(settings.seeds.clone(), settings.replicas, *local)
290                .with_candidates(candidates)
291        }
292    };
293    let mut fetcher = sync::Fetcher::new(config)?;
294
295    let mut progress = fetcher.progress();
296    term::info!(
297        "Fetching {} from the network, found {} potential seed(s).",
298        term::format::tertiary(rid),
299        term::format::tertiary(progress.candidate())
300    );
301    let mut spinner = FetcherSpinner::new(fetcher.target(), &progress);
302
303    while let Some(nid) = fetcher.next_node() {
304        match node.session(nid)? {
305            Some(session) if session.is_connected() => fetcher.ready_to_fetch(nid, session.addr),
306            _ => {
307                let addrs = db.addresses_of(&nid)?;
308                if addrs.is_empty() {
309                    fetcher.fetch_failed(nid, "Could not connect. No addresses known.");
310                } else if let Some(addr) = connect(
311                    nid,
312                    addrs.into_iter().map(|ka| ka.addr),
313                    settings.timeout,
314                    node,
315                    &mut spinner,
316                    &fetcher.progress(),
317                ) {
318                    fetcher.ready_to_fetch(nid, addr)
319                } else {
320                    fetcher
321                        .fetch_failed(nid, "Could not connect. At least one address is known but all attempts timed out.");
322                }
323            }
324        }
325        if let Some((nid, addr)) = fetcher.next_fetch() {
326            spinner.emit_fetching(&nid, &addr, &progress);
327            let result = node.fetch(
328                rid,
329                nid,
330                settings.timeout,
331                settings.signed_references_minimum_feature_level,
332            )?;
333            match fetcher.fetch_complete(nid, result) {
334                std::ops::ControlFlow::Continue(update) => {
335                    spinner.emit_progress(&update);
336                    progress = update
337                }
338                std::ops::ControlFlow::Break(success) => {
339                    spinner.finished(success.outcome());
340                    return Ok(sync::FetcherResult::TargetReached(success));
341                }
342            }
343        }
344    }
345    let result = fetcher.finish();
346    match &result {
347        sync::FetcherResult::TargetReached(success) => {
348            spinner.finished(success.outcome());
349        }
350        sync::FetcherResult::TargetError(missed) => spinner.failed(missed),
351    }
352    Ok(result)
353}
354
355// Try all addresses until one succeeds.
356// FIXME(fintohaps): I think this could return a `Result<node::Address,
357// Vec<AddressError>>` which could report back why each address failed
358fn connect(
359    nid: NodeId,
360    addrs: impl Iterator<Item = node::Address>,
361    timeout: time::Duration,
362    node: &mut Node,
363    spinner: &mut FetcherSpinner,
364    progress: &sync::fetch::Progress,
365) -> Option<node::Address> {
366    for addr in addrs {
367        spinner.emit_dialing(&nid, &addr, progress);
368        let cr = node.connect(
369            nid,
370            addr.clone(),
371            node::ConnectOptions {
372                persistent: false,
373                timeout,
374            },
375        );
376
377        match cr {
378            Ok(node::ConnectResult::Connected) => {
379                return Some(addr);
380            }
381            Ok(node::ConnectResult::Disconnected { .. }) => {
382                continue;
383            }
384            Err(e) => {
385                log::warn!(target: "cli", "Failed to connect to {nid}@{addr}: {e}");
386                continue;
387            }
388        }
389    }
390    None
391}
392
393fn sort_seeds_by(local: NodeId, seeds: &mut [Seed], aliases: &impl AliasStore, sort_by: &SortBy) {
394    let compare = |a: &Seed, b: &Seed| match sort_by {
395        SortBy::Nid => a.nid.cmp(&b.nid),
396        SortBy::Alias => {
397            let a = aliases.alias(&a.nid);
398            let b = aliases.alias(&b.nid);
399            a.cmp(&b)
400        }
401        SortBy::Status => match (&a.sync, &b.sync) {
402            (Some(_), None) => Ordering::Less,
403            (None, Some(_)) => Ordering::Greater,
404            (Some(a), Some(b)) => a.cmp(b).reverse(),
405            (None, None) => Ordering::Equal,
406        },
407    };
408
409    // Always show our local node first.
410    seeds.sort_by(|a, b| {
411        if a.nid == local {
412            Ordering::Less
413        } else if b.nid == local {
414            Ordering::Greater
415        } else {
416            compare(a, b)
417        }
418    });
419}
420
421struct FetcherSpinner {
422    preferred_seeds: usize,
423    replicas: sync::ReplicationFactor,
424    spinner: term::Spinner,
425}
426
427impl FetcherSpinner {
428    fn new(target: &sync::fetch::Target, progress: &sync::fetch::Progress) -> Self {
429        let preferred_seeds = target.preferred_seeds().len();
430        let replicas = target.replicas();
431        let spinner = term::spinner(format!(
432            "{} of {} preferred seeds, and {} of at least {} total seeds.",
433            term::format::secondary(progress.preferred()),
434            term::format::secondary(preferred_seeds),
435            term::format::secondary(progress.succeeded()),
436            term::format::secondary(replicas.lower_bound())
437        ));
438        Self {
439            preferred_seeds: target.preferred_seeds().len(),
440            replicas: *target.replicas(),
441            spinner,
442        }
443    }
444
445    fn emit_progress(&mut self, progress: &sync::fetch::Progress) {
446        self.spinner.message(format!(
447            "{} of {} preferred seeds, and {} of at least {} total seeds.",
448            term::format::secondary(progress.preferred()),
449            term::format::secondary(self.preferred_seeds),
450            term::format::secondary(progress.succeeded()),
451            term::format::secondary(self.replicas.lower_bound()),
452        ))
453    }
454
455    fn emit_fetching(
456        &mut self,
457        node: &NodeId,
458        addr: &node::Address,
459        progress: &sync::fetch::Progress,
460    ) {
461        self.spinner.message(format!(
462            "{} of {} preferred seeds, and {} of at least {} total seeds… [fetch {}@{}]",
463            term::format::secondary(progress.preferred()),
464            term::format::secondary(self.preferred_seeds),
465            term::format::secondary(progress.succeeded()),
466            term::format::secondary(self.replicas.lower_bound()),
467            term::format::tertiary(term::format::node_id_human_compact(node)),
468            term::format::tertiary(addr.display_compact()),
469        ))
470    }
471
472    fn emit_dialing(
473        &mut self,
474        node: &NodeId,
475        addr: &node::Address,
476        progress: &sync::fetch::Progress,
477    ) {
478        self.spinner.message(format!(
479            "{} of {} preferred seeds, and {} of at least {} total seeds… [dial {}@{}]",
480            term::format::secondary(progress.preferred()),
481            term::format::secondary(self.preferred_seeds),
482            term::format::secondary(progress.succeeded()),
483            term::format::secondary(self.replicas.lower_bound()),
484            term::format::tertiary(term::format::node_id_human_compact(node)),
485            term::format::tertiary(addr.display_compact()),
486        ))
487    }
488
489    fn finished(mut self, outcome: &SuccessfulOutcome) {
490        match outcome {
491            SuccessfulOutcome::PreferredNodes { preferred } => {
492                self.spinner.message(format!(
493                    "Target met: {} preferred seed(s).",
494                    term::format::positive(preferred),
495                ));
496            }
497            SuccessfulOutcome::MinReplicas { succeeded, .. } => {
498                self.spinner.message(format!(
499                    "Target met: {} seed(s)",
500                    term::format::positive(succeeded)
501                ));
502            }
503            SuccessfulOutcome::MaxReplicas {
504                succeeded,
505                min,
506                max,
507            } => {
508                self.spinner.message(format!(
509                    "Target met: {} of {} min and {} max seed(s)",
510                    succeeded,
511                    term::format::secondary(min),
512                    term::format::secondary(max)
513                ));
514            }
515        }
516        self.spinner.finish()
517    }
518
519    fn failed(mut self, missed: &sync::fetch::TargetMissed) {
520        let mut message = "Target not met: ".to_string();
521        let missing_preferred_seeds = missed
522            .missed_nodes()
523            .iter()
524            .map(|nid| term::format::node_id_human(nid).to_string())
525            .collect::<Vec<_>>();
526        let required = missed.required_nodes();
527        if !missing_preferred_seeds.is_empty() {
528            message.push_str(&format!(
529                "could not fetch from [{}], and required {} more seed(s)",
530                missing_preferred_seeds.join(", "),
531                required
532            ));
533        } else {
534            message.push_str(&format!("required {required} more seed(s)"));
535        }
536        self.spinner.message(message);
537        self.spinner.failed();
538    }
539}
540
541fn display_fetch_result(result: &sync::FetcherResult, verbose: bool) {
542    match result {
543        sync::FetcherResult::TargetReached(success) => {
544            let progress = success.progress();
545            let results = success.fetch_results();
546            display_success(results.success(), verbose);
547            let failed = progress.failed();
548            if failed > 0 && verbose {
549                term::warning(format!("Failed to fetch from {failed} seed(s)."));
550                for (node, reason) in results.failed() {
551                    term::warning(format!(
552                        "{}: {}",
553                        term::format::node_id_human(node),
554                        term::format::yellow(reason),
555                    ))
556                }
557            }
558        }
559        sync::FetcherResult::TargetError(failed) => {
560            let results = failed.fetch_results();
561            let progress = failed.progress();
562            let target = failed.target();
563            let succeeded = progress.succeeded();
564            let missed = failed.missed_nodes();
565            term::error(format!(
566                "Fetched from {} preferred seed(s), could not reach {} seed(s)",
567                succeeded,
568                target.replicas().lower_bound(),
569            ));
570            term::error(format!(
571                "Could not replicate from {} preferred seed(s)",
572                missed.len()
573            ));
574            for (node, reason) in results.failed() {
575                term::error(format!(
576                    "{}: {}",
577                    term::format::node_id_human(node),
578                    term::format::negative(reason),
579                ))
580            }
581            if succeeded > 0 {
582                term::info!("Successfully fetched from the following seeds:");
583                display_success(results.success(), verbose)
584            }
585        }
586    }
587}
588
589fn display_success<'a>(
590    results: impl Iterator<Item = (&'a NodeId, &'a [RefUpdate], HashSet<NodeId>)>,
591    verbose: bool,
592) {
593    for (node, updates, _) in results {
594        term::println(
595            "🌱 Fetched from",
596            term::format::secondary(term::format::node_id_human(node)),
597        );
598        if verbose {
599            let mut updates = updates
600                .iter()
601                .filter(|up| !matches!(up, RefUpdate::Skipped { .. }))
602                .peekable();
603            if updates.peek().is_none() {
604                term::indented(term::format::italic("no references were updated"));
605            } else {
606                for update in updates {
607                    term::indented(term::format::ref_update_verbose(update))
608                }
609            }
610        }
611    }
612}
613
614fn print_announcer_result(result: &sync::AnnouncerResult, verbose: bool) {
615    use sync::announce::SuccessfulOutcome::*;
616    match result {
617        sync::AnnouncerResult::Success(success) if verbose => {
618            // N.b. Printing how many seeds were synced with is printed
619            // elsewhere
620            match success.outcome() {
621                MinReplicationFactor { preferred, synced }
622                | MaxReplicationFactor { preferred, synced }
623                | PreferredNodes {
624                    preferred,
625                    total_nodes_synced: synced,
626                } => {
627                    if preferred == 0 {
628                        term::success!("Synced {} seed(s)", term::format::positive(synced));
629                    } else {
630                        term::success!(
631                            "Synced {} preferred seed(s) and {} total seed(s)",
632                            term::format::positive(preferred),
633                            term::format::positive(synced)
634                        );
635                    }
636                }
637            }
638            print_synced(success.synced());
639        }
640        sync::AnnouncerResult::Success(_) => {
641            // Successes are ignored when `!verbose`.
642        }
643        sync::AnnouncerResult::TimedOut(result) => {
644            if result.synced().is_empty() {
645                term::error("All seeds timed out, use `rad sync -v` to see the list of seeds");
646                return;
647            }
648            let timed_out = result.timed_out();
649            term::warning(format!(
650                "{} seed(s) timed out, use `rad sync -v` to see the list of seeds",
651                timed_out.len(),
652            ));
653            if verbose {
654                print_synced(result.synced());
655                for node in timed_out {
656                    term::warning(format!("{} timed out", term::format::node_id_human(node)));
657                }
658            }
659        }
660        sync::AnnouncerResult::NoNodes(result) => {
661            term::info!("Announcement could not sync with anymore seeds.");
662            if verbose {
663                print_synced(result.synced())
664            }
665        }
666    }
667}
668
669fn print_synced(synced: &BTreeMap<NodeId, sync::announce::SyncStatus>) {
670    for (node, status) in synced.iter() {
671        let mut message = format!("🌱 Synced with {}", term::format::node_id_human(node));
672
673        match status {
674            sync::announce::SyncStatus::AlreadySynced => {
675                message.push_str(&format!("{}", term::format::dim(" (already in sync)")));
676            }
677            sync::announce::SyncStatus::Synced { duration } => {
678                message.push_str(&format!(
679                    "{}",
680                    term::format::dim(format!(" in {}s", duration.as_secs()))
681                ));
682            }
683        }
684        term::info!("{}", message);
685    }
686}