use core::time;
use std::collections::BTreeSet;
use std::io::Write;
use radicle::node::sync;
use radicle::node::{Handle as _, NodeId};
use radicle::storage::{refs, ReadRepository, RepositoryError};
use radicle::{Node, Profile};
use crate::terminal as term;
pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncSettings {
pub replicas: sync::ReplicationFactor,
pub seeds: BTreeSet<NodeId>,
pub timeout: time::Duration,
pub signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
}
impl SyncSettings {
#[must_use]
pub fn timeout(mut self, timeout: time::Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn minimum_feature_level(mut self, feature_level: Option<refs::FeatureLevel>) -> Self {
self.signed_references_minimum_feature_level = feature_level;
self
}
#[must_use]
pub fn replicas(mut self, replicas: sync::ReplicationFactor) -> Self {
self.replicas = replicas;
self
}
pub fn seeds(mut self, seeds: impl IntoIterator<Item = NodeId>) -> Self {
self.seeds = seeds.into_iter().collect();
self
}
#[must_use]
pub fn with_profile(mut self, profile: &Profile) -> Self {
if self.seeds.is_empty() {
self.seeds = profile
.config
.preferred_seeds
.iter()
.map(|p| p.id)
.collect();
}
self.seeds.remove(profile.id());
self
}
}
impl Default for SyncSettings {
fn default() -> Self {
Self {
replicas: sync::ReplicationFactor::default(),
seeds: BTreeSet::new(),
timeout: DEFAULT_SYNC_TIMEOUT,
signed_references_minimum_feature_level: None,
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum SyncError {
#[error(transparent)]
Repository(#[from] RepositoryError),
#[error(transparent)]
Node(#[from] radicle::node::Error),
#[error("all seeds timed out")]
AllSeedsTimedOut,
#[error(transparent)]
Target(#[from] sync::announce::TargetError),
}
impl SyncError {
fn is_connection_err(&self) -> bool {
match self {
Self::Node(e) => e.is_connection_err(),
Self::Repository(_) | Self::AllSeedsTimedOut | Self::Target(_) => false,
}
}
}
pub struct SyncReporting {
pub progress: term::PaintTarget,
pub completion: term::PaintTarget,
pub debug: bool,
}
impl Default for SyncReporting {
fn default() -> Self {
Self {
progress: term::PaintTarget::Stderr,
completion: term::PaintTarget::Stdout,
debug: false,
}
}
}
pub fn announce<R: ReadRepository>(
repo: &R,
settings: SyncSettings,
reporting: SyncReporting,
node: &mut Node,
profile: &Profile,
) -> Result<Option<sync::AnnouncerResult>, SyncError> {
match announce_(repo, settings, reporting, node, profile) {
Ok(result) => Ok(result),
Err(e) if e.is_connection_err() => {
term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
Ok(None)
}
Err(e) => Err(e),
}
}
fn announce_<R>(
repo: &R,
settings: SyncSettings,
reporting: SyncReporting,
node: &mut Node,
profile: &Profile,
) -> Result<Option<sync::AnnouncerResult>, SyncError>
where
R: ReadRepository,
{
let me = profile.id();
let rid = repo.id();
let doc = repo.identity_doc()?;
let settings = settings.with_profile(profile);
let n_preferred_seeds = settings.seeds.len();
let config = match sync::PrivateNetwork::private_repo(&doc) {
None => {
let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
(BTreeSet::new(), BTreeSet::new()),
|(mut synced, mut unsynced), seed| {
if seed.is_synced() {
synced.insert(seed.nid);
} else {
unsynced.insert(seed.nid);
}
(synced, unsynced)
},
);
sync::AnnouncerConfig::public(*me, settings.replicas, settings.seeds, synced, unsynced)
}
Some(network) => {
let sessions = node.sessions()?;
let network =
network.restrict(|nid| sessions.iter().any(|s| s.nid == *nid && s.is_connected()));
sync::AnnouncerConfig::private(*me, settings.replicas, network)
}
};
let announcer = match sync::Announcer::new(config) {
Ok(announcer) => announcer,
Err(err) => match err {
sync::AnnouncerError::AlreadySynced(result) => {
term::success!(
&mut reporting.completion.writer();
"Nothing to announce, already in sync with {} seed(s) (see `rad sync status`)",
term::format::positive(result.synced()),
);
return Ok(None);
}
sync::AnnouncerError::NoSeeds => {
term::info!(
&mut reporting.completion.writer();
"{}",
term::format::yellow(format!("No seeds found for {rid}."))
);
return Ok(None);
}
sync::AnnouncerError::Target(err) => return Err(err.into()),
},
};
let target = announcer.target();
let min_replicas = target.replicas().lower_bound();
let mut spinner = term::spinner_to(
format!("Found {} seed(s)..", announcer.progress().unsynced()),
reporting.progress.clone(),
reporting.completion.clone(),
);
match node.announce(
rid,
[profile.did().into()],
settings.timeout,
announcer,
|node, progress| {
spinner.message(format!(
"Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
term::format::node_id_human_compact(node),
term::format::secondary(progress.preferred()),
term::format::secondary(n_preferred_seeds),
term::format::secondary(progress.synced()),
term::format::secondary(min_replicas.max(progress.synced())),
));
},
) {
Ok(result) => {
spinner.message(format!(
"Synced with {} seed(s)",
term::format::positive(result.synced().len())
));
spinner.finish();
Ok(Some(result))
}
Err(err) => {
spinner.error(format!("Sync failed: {err}"));
Err(err.into())
}
}
}