Skip to main content

radicle_cli/commands/
sync.rs

1mod args;
2
3use std::cmp::Ordering;
4use std::collections::BTreeMap;
5use std::collections::HashSet;
6use std::time;
7
8use anyhow::anyhow;
9
10use radicle::node;
11use radicle::node::SyncedAt;
12use radicle::node::address::Store;
13use radicle::node::sync;
14use radicle::node::sync::fetch::SuccessfulOutcome;
15use radicle::node::{AliasStore, Handle as _, Node, Seed, SyncStatus};
16use radicle::prelude::{NodeId, Profile, RepoId};
17use radicle::storage::ReadRepository;
18use radicle::storage::RefUpdate;
19use radicle::storage::{ReadStorage, RemoteRepository};
20use radicle_term::Element;
21
22use crate::node::SyncReporting;
23use crate::node::SyncSettings;
24use crate::terminal as term;
25use crate::terminal::args::rid_or_cwd;
26use crate::terminal::format::Author;
27use crate::terminal::{Table, TableOptions};
28
29pub use args::Args;
30use args::{Command, SortBy, SyncDirection, SyncMode};
31
32pub fn run(args: Args, ctx: impl term::Context) -> anyhow::Result<()> {
33    let profile = ctx.profile()?;
34    let mut node = radicle::Node::new(profile.socket_from_env());
35    if !node.is_running() {
36        anyhow::bail!(
37            "to sync a repository, your node must be running. To start it, run `rad node start`"
38        );
39    }
40    let verbose = args.verbose;
41    let debug = args.verbose;
42
43    match args.command {
44        Some(Command::Status { repo, sort_by }) => {
45            let (_, rid) = rid_or_cwd(repo)?;
46            sync_status(rid, &mut node, &profile, &sort_by, verbose)?;
47        }
48        None => match SyncMode::from(args.sync) {
49            SyncMode::Repo {
50                repo,
51                settings,
52                direction,
53            } => {
54                let (_, rid) = rid_or_cwd(repo)?;
55                let settings = settings.clone().with_profile(&profile);
56
57                if matches!(direction, SyncDirection::Fetch | SyncDirection::Both) {
58                    if !profile.policies()?.is_seeding(&rid)? {
59                        anyhow::bail!("repository {rid} is not seeded");
60                    }
61                    let result = fetch(rid, settings.clone(), &mut node, &profile)?;
62                    display_fetch_result(&result, verbose)
63                }
64                if matches!(direction, SyncDirection::Announce | SyncDirection::Both) {
65                    announce_refs(rid, settings, &mut node, &profile, verbose, debug)?;
66                }
67            }
68            SyncMode::Inventory => {
69                announce_inventory(node)?;
70            }
71        },
72    }
73
74    Ok(())
75}
76
77fn sync_status(
78    rid: RepoId,
79    node: &mut Node,
80    profile: &Profile,
81    sort_by: &SortBy,
82    verbose: bool,
83) -> anyhow::Result<()> {
84    const SYMBOL_STATE: &str = "?";
85    const SYMBOL_STATE_UNKNOWN: &str = "•";
86
87    let mut table = Table::<5, term::Label>::new(TableOptions::bordered());
88    let mut seeds: Vec<_> = node.seeds_for(rid, [*profile.did()])?.into();
89    let local_nid = node.nid()?;
90    let aliases = profile.aliases();
91
92    table.header([
93        term::format::bold("Node ID").into(),
94        term::format::bold("Alias").into(),
95        term::format::bold(SYMBOL_STATE).into(),
96        term::format::bold("SigRefs").into(),
97        term::format::bold("Timestamp").into(),
98    ]);
99    table.divider();
100
101    sort_seeds_by(local_nid, &mut seeds, &aliases, sort_by);
102
103    let seeds = seeds.into_iter().flat_map(|seed| {
104        let (status, head, time) = match seed.sync {
105            Some(SyncStatus::Synced {
106                at: SyncedAt { oid, timestamp },
107            }) => (
108                term::PREFIX_SUCCESS,
109                term::format::oid(oid),
110                term::format::timestamp(timestamp),
111            ),
112            Some(SyncStatus::OutOfSync {
113                remote: SyncedAt { timestamp, .. },
114                local,
115                ..
116            }) if seed.nid == local_nid => (
117                term::PREFIX_WARNING,
118                term::format::oid(local.oid),
119                term::format::timestamp(timestamp),
120            ),
121            Some(SyncStatus::OutOfSync {
122                remote: SyncedAt { oid, timestamp },
123                ..
124            }) => (
125                term::PREFIX_ERROR,
126                term::format::oid(oid),
127                term::format::timestamp(timestamp),
128            ),
129            None if verbose => (
130                term::format::dim(SYMBOL_STATE_UNKNOWN),
131                term::paint(String::new()),
132                term::paint(String::new()),
133            ),
134            None => return None,
135        };
136
137        let (alias, nid) = Author::new(&seed.nid, profile, verbose).labels();
138
139        Some([
140            nid,
141            alias,
142            status.into(),
143            term::format::secondary(head).into(),
144            time.dim().italic().into(),
145        ])
146    });
147
148    table.extend(seeds);
149    table.print();
150
151    if profile.hints() {
152        const COLUMN_WIDTH: usize = 16;
153        let status = format!(
154            "\n{:>4} … {}\n       {}   {}\n       {}   {}",
155            term::Paint::from(SYMBOL_STATE.to_string()).fg(radicle_term::Color::White),
156            term::format::dim("Status:"),
157            format_args!(
158                "{} {:width$}",
159                term::PREFIX_SUCCESS,
160                term::format::dim("… in sync"),
161                width = COLUMN_WIDTH,
162            ),
163            format_args!(
164                "{} {}",
165                term::PREFIX_ERROR,
166                term::format::dim("… out of sync")
167            ),
168            format_args!(
169                "{} {:width$}",
170                term::PREFIX_WARNING,
171                term::format::dim("… not announced"),
172                width = COLUMN_WIDTH,
173            ),
174            format_args!(
175                "{} {}",
176                term::format::dim(SYMBOL_STATE_UNKNOWN),
177                term::format::dim("… unknown")
178            ),
179        );
180        term::hint(status);
181    }
182
183    Ok(())
184}
185
186fn announce_refs(
187    rid: RepoId,
188    settings: SyncSettings,
189    node: &mut Node,
190    profile: &Profile,
191    verbose: bool,
192    debug: bool,
193) -> anyhow::Result<()> {
194    let Ok(repo) = profile.storage.repository(rid) else {
195        return Err(anyhow!(
196            "nothing to announce, repository {rid} is not available locally"
197        ));
198    };
199    if let Err(e) = repo.remote(&profile.public_key) {
200        if e.is_not_found() {
201            term::println(term::format::italic(
202                "Nothing to announce, you don't have a fork of this repository.",
203            ));
204            return Ok(());
205        } else {
206            return Err(anyhow!("failed to load local fork of {rid}: {e}"));
207        }
208    }
209
210    let result = crate::node::announce(
211        &repo,
212        settings,
213        SyncReporting {
214            debug,
215            ..SyncReporting::default()
216        },
217        node,
218        profile,
219    )?;
220    if let Some(result) = result {
221        print_announcer_result(&result, verbose)
222    }
223
224    Ok(())
225}
226
227pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
228    let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count();
229    let spinner = term::spinner(format!("Announcing inventory to {peers} peers.."));
230
231    node.announce_inventory()?;
232    spinner.finish();
233
234    Ok(())
235}
236
237#[derive(Debug, thiserror::Error)]
238pub enum FetchError {
239    #[error(transparent)]
240    Node(#[from] node::Error),
241    #[error(transparent)]
242    Db(#[from] node::db::Error),
243    #[error(transparent)]
244    Address(#[from] node::address::Error),
245    #[error(transparent)]
246    Fetcher(#[from] sync::FetcherError),
247}
248
249pub fn fetch(
250    rid: RepoId,
251    settings: SyncSettings,
252    node: &mut Node,
253    profile: &Profile,
254) -> Result<sync::FetcherResult, FetchError> {
255    let db = profile.database()?;
256    let local = profile.id();
257    let is_private = profile.storage.repository(rid).ok().and_then(|repo| {
258        let doc = repo.identity_doc().ok()?.doc;
259        sync::PrivateNetwork::private_repo(&doc)
260    });
261    let config = match is_private {
262        Some(private) => sync::FetcherConfig::private(private, settings.replicas, *local),
263        None => {
264            // We push nodes that are in our seed list in attempt to fulfill the
265            // replicas, if needed.
266            let seeds = node.seeds_for(rid, [*profile.did()])?;
267            let (connected, disconnected) = seeds.partition();
268            let candidates = connected
269                .into_iter()
270                .map(|seed| seed.nid)
271                .chain(disconnected.into_iter().filter_map(|seed| {
272                    // Only consider seeds that have at least one known address.
273                    (!seed.addrs.is_empty()).then_some(seed.nid)
274                }))
275                .map(sync::fetch::Candidate::new);
276            sync::FetcherConfig::public(settings.seeds.clone(), settings.replicas, *local)
277                .with_candidates(candidates)
278        }
279    };
280    let mut fetcher = sync::Fetcher::new(config)?;
281
282    let mut progress = fetcher.progress();
283    term::info!(
284        "Fetching {} from the network, found {} potential seed(s).",
285        term::format::tertiary(rid),
286        term::format::tertiary(progress.candidate())
287    );
288    let mut spinner = FetcherSpinner::new(fetcher.target(), &progress);
289
290    while let Some(nid) = fetcher.next_node() {
291        match node.session(nid)? {
292            Some(session) if session.is_connected() => fetcher.ready_to_fetch(nid, session.addr),
293            _ => {
294                let addrs = db.addresses_of(&nid)?;
295                if addrs.is_empty() {
296                    fetcher.fetch_failed(nid, "Could not connect. No addresses known.");
297                } else if let Some(addr) = connect(
298                    nid,
299                    addrs.into_iter().map(|ka| ka.addr),
300                    settings.timeout,
301                    node,
302                    &mut spinner,
303                    &fetcher.progress(),
304                ) {
305                    fetcher.ready_to_fetch(nid, addr)
306                } else {
307                    fetcher
308                        .fetch_failed(nid, "Could not connect. At least one address is known but all attempts timed out.");
309                }
310            }
311        }
312        if let Some((nid, addr)) = fetcher.next_fetch() {
313            spinner.emit_fetching(&nid, &addr, &progress);
314            let result = node.fetch(
315                rid,
316                nid,
317                settings.timeout,
318                settings.signed_references_minimum_feature_level,
319            )?;
320            match fetcher.fetch_complete(nid, result) {
321                std::ops::ControlFlow::Continue(update) => {
322                    spinner.emit_progress(&update);
323                    progress = update
324                }
325                std::ops::ControlFlow::Break(success) => {
326                    spinner.finished(success.outcome());
327                    return Ok(sync::FetcherResult::TargetReached(success));
328                }
329            }
330        }
331    }
332    let result = fetcher.finish();
333    match &result {
334        sync::FetcherResult::TargetReached(success) => {
335            spinner.finished(success.outcome());
336        }
337        sync::FetcherResult::TargetError(missed) => spinner.failed(missed),
338    }
339    Ok(result)
340}
341
342// Try all addresses until one succeeds.
343// FIXME(fintohaps): I think this could return a `Result<node::Address,
344// Vec<AddressError>>` which could report back why each address failed
345fn connect(
346    nid: NodeId,
347    addrs: impl Iterator<Item = node::Address>,
348    timeout: time::Duration,
349    node: &mut Node,
350    spinner: &mut FetcherSpinner,
351    progress: &sync::fetch::Progress,
352) -> Option<node::Address> {
353    for addr in addrs {
354        spinner.emit_dialing(&nid, &addr, progress);
355        let cr = node.connect(
356            nid,
357            addr.clone(),
358            node::ConnectOptions {
359                persistent: false,
360                timeout,
361            },
362        );
363
364        match cr {
365            Ok(node::ConnectResult::Connected) => {
366                return Some(addr);
367            }
368            Ok(node::ConnectResult::Disconnected { .. }) => {
369                continue;
370            }
371            Err(e) => {
372                log::warn!(target: "cli", "Failed to connect to {nid}@{addr}: {e}");
373                continue;
374            }
375        }
376    }
377    None
378}
379
380fn sort_seeds_by(local: NodeId, seeds: &mut [Seed], aliases: &impl AliasStore, sort_by: &SortBy) {
381    let compare = |a: &Seed, b: &Seed| match sort_by {
382        SortBy::Nid => a.nid.cmp(&b.nid),
383        SortBy::Alias => {
384            let a = aliases.alias(&a.nid);
385            let b = aliases.alias(&b.nid);
386            a.cmp(&b)
387        }
388        SortBy::Status => match (&a.sync, &b.sync) {
389            (Some(_), None) => Ordering::Less,
390            (None, Some(_)) => Ordering::Greater,
391            (Some(a), Some(b)) => a.cmp(b).reverse(),
392            (None, None) => Ordering::Equal,
393        },
394    };
395
396    // Always show our local node first.
397    seeds.sort_by(|a, b| {
398        if a.nid == local {
399            Ordering::Less
400        } else if b.nid == local {
401            Ordering::Greater
402        } else {
403            compare(a, b)
404        }
405    });
406}
407
408struct FetcherSpinner {
409    preferred_seeds: usize,
410    replicas: sync::ReplicationFactor,
411    spinner: term::Spinner,
412}
413
414impl FetcherSpinner {
415    fn new(target: &sync::fetch::Target, progress: &sync::fetch::Progress) -> Self {
416        let preferred_seeds = target.preferred_seeds().len();
417        let replicas = target.replicas();
418        let spinner = term::spinner(format!(
419            "{} of {} preferred seeds, and {} of at least {} total seeds.",
420            term::format::secondary(progress.preferred()),
421            term::format::secondary(preferred_seeds),
422            term::format::secondary(progress.succeeded()),
423            term::format::secondary(replicas.lower_bound())
424        ));
425        Self {
426            preferred_seeds: target.preferred_seeds().len(),
427            replicas: *target.replicas(),
428            spinner,
429        }
430    }
431
432    fn emit_progress(&mut self, progress: &sync::fetch::Progress) {
433        self.spinner.message(format!(
434            "{} of {} preferred seeds, and {} of at least {} total seeds.",
435            term::format::secondary(progress.preferred()),
436            term::format::secondary(self.preferred_seeds),
437            term::format::secondary(progress.succeeded()),
438            term::format::secondary(self.replicas.lower_bound()),
439        ))
440    }
441
442    fn emit_fetching(
443        &mut self,
444        node: &NodeId,
445        addr: &node::Address,
446        progress: &sync::fetch::Progress,
447    ) {
448        self.spinner.message(format!(
449            "{} of {} preferred seeds, and {} of at least {} total seeds… [fetch {}@{}]",
450            term::format::secondary(progress.preferred()),
451            term::format::secondary(self.preferred_seeds),
452            term::format::secondary(progress.succeeded()),
453            term::format::secondary(self.replicas.lower_bound()),
454            term::format::tertiary(term::format::node_id_human_compact(node)),
455            term::format::tertiary(addr.display_compact()),
456        ))
457    }
458
459    fn emit_dialing(
460        &mut self,
461        node: &NodeId,
462        addr: &node::Address,
463        progress: &sync::fetch::Progress,
464    ) {
465        self.spinner.message(format!(
466            "{} of {} preferred seeds, and {} of at least {} total seeds… [dial {}@{}]",
467            term::format::secondary(progress.preferred()),
468            term::format::secondary(self.preferred_seeds),
469            term::format::secondary(progress.succeeded()),
470            term::format::secondary(self.replicas.lower_bound()),
471            term::format::tertiary(term::format::node_id_human_compact(node)),
472            term::format::tertiary(addr.display_compact()),
473        ))
474    }
475
476    fn finished(mut self, outcome: &SuccessfulOutcome) {
477        match outcome {
478            SuccessfulOutcome::PreferredNodes { preferred } => {
479                self.spinner.message(format!(
480                    "Target met: {} preferred seed(s).",
481                    term::format::positive(preferred),
482                ));
483            }
484            SuccessfulOutcome::MinReplicas { succeeded, .. } => {
485                self.spinner.message(format!(
486                    "Target met: {} seed(s)",
487                    term::format::positive(succeeded)
488                ));
489            }
490            SuccessfulOutcome::MaxReplicas {
491                succeeded,
492                min,
493                max,
494            } => {
495                self.spinner.message(format!(
496                    "Target met: {} of {} min and {} max seed(s)",
497                    succeeded,
498                    term::format::secondary(min),
499                    term::format::secondary(max)
500                ));
501            }
502        }
503        self.spinner.finish()
504    }
505
506    fn failed(mut self, missed: &sync::fetch::TargetMissed) {
507        let mut message = "Target not met: ".to_string();
508        let missing_preferred_seeds = missed
509            .missed_nodes()
510            .iter()
511            .map(|nid| term::format::node_id_human(nid).to_string())
512            .collect::<Vec<_>>();
513        let required = missed.required_nodes();
514        if !missing_preferred_seeds.is_empty() {
515            message.push_str(&format!(
516                "could not fetch from [{}], and required {} more seed(s)",
517                missing_preferred_seeds.join(", "),
518                required
519            ));
520        } else {
521            message.push_str(&format!("required {required} more seed(s)"));
522        }
523        self.spinner.message(message);
524        self.spinner.failed();
525    }
526}
527
528fn display_fetch_result(result: &sync::FetcherResult, verbose: bool) {
529    match result {
530        sync::FetcherResult::TargetReached(success) => {
531            let progress = success.progress();
532            let results = success.fetch_results();
533            display_success(results.success(), verbose);
534            let failed = progress.failed();
535            if failed > 0 && verbose {
536                term::warning(format!("Failed to fetch from {failed} seed(s)."));
537                for (node, reason) in results.failed() {
538                    term::warning(format!(
539                        "{}: {}",
540                        term::format::node_id_human(node),
541                        term::format::yellow(reason),
542                    ))
543                }
544            }
545        }
546        sync::FetcherResult::TargetError(failed) => {
547            let results = failed.fetch_results();
548            let progress = failed.progress();
549            let target = failed.target();
550            let succeeded = progress.succeeded();
551            let missed = failed.missed_nodes();
552            term::error(format!(
553                "Fetched from {} preferred seed(s), could not reach {} seed(s)",
554                succeeded,
555                target.replicas().lower_bound(),
556            ));
557            term::error(format!(
558                "Could not replicate from {} preferred seed(s)",
559                missed.len()
560            ));
561            for (node, reason) in results.failed() {
562                term::error(format!(
563                    "{}: {}",
564                    term::format::node_id_human(node),
565                    term::format::negative(reason),
566                ))
567            }
568            if succeeded > 0 {
569                term::info!("Successfully fetched from the following seeds:");
570                display_success(results.success(), verbose)
571            }
572        }
573    }
574}
575
576fn display_success<'a>(
577    results: impl Iterator<Item = (&'a NodeId, &'a [RefUpdate], HashSet<NodeId>)>,
578    verbose: bool,
579) {
580    for (node, updates, _) in results {
581        term::println_prefixed(
582            "🌱 Fetched from",
583            term::format::secondary(term::format::node_id_human(node)),
584        );
585        if verbose {
586            let mut updates = updates
587                .iter()
588                .filter(|up| !matches!(up, RefUpdate::Skipped { .. }))
589                .peekable();
590            if updates.peek().is_none() {
591                term::indented(term::format::italic("no references were updated"));
592            } else {
593                for update in updates {
594                    term::indented(term::format::ref_update_verbose(update))
595                }
596            }
597        }
598    }
599}
600
601fn print_announcer_result(result: &sync::AnnouncerResult, verbose: bool) {
602    use sync::announce::SuccessfulOutcome::*;
603    match result {
604        sync::AnnouncerResult::Success(success) if verbose => {
605            // N.b. Printing how many seeds were synced with is printed
606            // elsewhere
607            match success.outcome() {
608                MinReplicationFactor { preferred, synced }
609                | MaxReplicationFactor { preferred, synced }
610                | PreferredNodes {
611                    preferred,
612                    total_nodes_synced: synced,
613                } => {
614                    if preferred == 0 {
615                        term::success!("Synced {} seed(s)", term::format::positive(synced));
616                    } else {
617                        term::success!(
618                            "Synced {} preferred seed(s) and {} total seed(s)",
619                            term::format::positive(preferred),
620                            term::format::positive(synced)
621                        );
622                    }
623                }
624            }
625            print_synced(success.synced());
626        }
627        sync::AnnouncerResult::Success(_) => {
628            // Successes are ignored when `!verbose`.
629        }
630        sync::AnnouncerResult::TimedOut(result) => {
631            if result.synced().is_empty() {
632                term::error("All seeds timed out, use `rad sync -v` to see the list of seeds");
633                return;
634            }
635            let timed_out = result.timed_out();
636            term::warning(format!(
637                "{} seed(s) timed out, use `rad sync -v` to see the list of seeds",
638                timed_out.len(),
639            ));
640            if verbose {
641                print_synced(result.synced());
642                for node in timed_out {
643                    term::warning(format!("{} timed out", term::format::node_id_human(node)));
644                }
645            }
646        }
647        sync::AnnouncerResult::NoNodes(result) => {
648            term::info!("Announcement could not sync with anymore seeds.");
649            if verbose {
650                print_synced(result.synced())
651            }
652        }
653    }
654}
655
656fn print_synced(synced: &BTreeMap<NodeId, sync::announce::SyncStatus>) {
657    for (node, status) in synced.iter() {
658        let mut message = format!("🌱 Synced with {}", term::format::node_id_human(node));
659
660        match status {
661            sync::announce::SyncStatus::AlreadySynced => {
662                message.push_str(&format!("{}", term::format::dim(" (already in sync)")));
663            }
664            sync::announce::SyncStatus::Synced { duration } => {
665                message.push_str(&format!(
666                    "{}",
667                    term::format::dim(format!(" in {}s", duration.as_secs()))
668                ));
669            }
670        }
671        term::info!("{}", message);
672    }
673}