use core::time;
use std::collections::BTreeSet;
use std::io;
use std::io::Write;
use std::ops::ControlFlow;
use radicle::node::{self, AnnounceResult};
use radicle::node::{Handle as _, NodeId};
use radicle::storage::{ReadRepository, RepositoryError};
use radicle::{Node, Profile};
use radicle_term::format;
use crate::terminal as term;
pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncSettings {
pub replicas: usize,
pub seeds: BTreeSet<NodeId>,
pub timeout: time::Duration,
}
impl SyncSettings {
pub fn timeout(mut self, timeout: time::Duration) -> Self {
self.timeout = timeout;
self
}
pub fn replicas(mut self, replicas: usize) -> Self {
self.replicas = replicas;
self
}
pub fn with_profile(mut self, profile: &Profile) -> Self {
if self.seeds.is_empty() {
self.seeds = profile
.config
.preferred_seeds
.iter()
.map(|p| p.id)
.collect();
}
self.seeds.remove(profile.id());
self
}
}
impl Default for SyncSettings {
fn default() -> Self {
Self {
replicas: 3,
seeds: BTreeSet::new(),
timeout: DEFAULT_SYNC_TIMEOUT,
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum SyncError {
#[error(transparent)]
Repository(#[from] RepositoryError),
#[error(transparent)]
Node(#[from] radicle::node::Error),
#[error("all seeds timed out")]
AllSeedsTimedOut,
}
impl SyncError {
fn is_connection_err(&self) -> bool {
match self {
Self::Node(e) => e.is_connection_err(),
_ => false,
}
}
}
#[derive(Debug)]
pub enum SyncWriter {
Stdout(io::Stdout),
Stderr(io::Stderr),
Sink,
}
impl Clone for SyncWriter {
fn clone(&self) -> Self {
match self {
Self::Stdout(_) => Self::Stdout(io::stdout()),
Self::Stderr(_) => Self::Stderr(io::stderr()),
Self::Sink => Self::Sink,
}
}
}
impl io::Write for SyncWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::Stdout(stdout) => stdout.write(buf),
Self::Stderr(stderr) => stderr.write(buf),
Self::Sink => Ok(buf.len()),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
Self::Stdout(stdout) => stdout.flush(),
Self::Stderr(stderr) => stderr.flush(),
Self::Sink => Ok(()),
}
}
}
pub struct SyncReporting {
pub progress: SyncWriter,
pub completion: SyncWriter,
pub debug: bool,
}
impl Default for SyncReporting {
fn default() -> Self {
Self {
progress: SyncWriter::Stderr(io::stderr()),
completion: SyncWriter::Stdout(io::stdout()),
debug: false,
}
}
}
pub fn announce<R: ReadRepository>(
repo: &R,
settings: SyncSettings,
reporting: SyncReporting,
node: &mut Node,
profile: &Profile,
) -> Result<AnnounceResult, SyncError> {
match announce_(repo, settings, reporting, node, profile) {
Ok(result) => Ok(result),
Err(e) if e.is_connection_err() => {
term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
Ok(AnnounceResult::default())
}
Err(e) => Err(e),
}
}
fn announce_<R: ReadRepository>(
repo: &R,
settings: SyncSettings,
mut reporting: SyncReporting,
node: &mut Node,
profile: &Profile,
) -> Result<AnnounceResult, SyncError> {
let rid = repo.id();
let doc = repo.identity_doc()?;
let mut settings = settings.with_profile(profile);
let unsynced: Vec<_> = if doc.visibility.is_public() {
let all = node.seeds(rid)?;
if all.is_empty() {
term::info!(&mut reporting.completion; "No seeds found for {rid}.");
return Ok(AnnounceResult::default());
}
let synced = all
.iter()
.filter(|s| s.is_synced())
.map(|s| s.nid)
.collect::<BTreeSet<_>>();
let replicas = synced.iter().filter(|nid| *nid != profile.id()).count();
let max_replicas = all.iter().filter(|s| &s.nid != profile.id()).count();
let is_seeds_synced = settings.seeds.iter().all(|s| synced.contains(s));
let is_replicas_synced = replicas >= settings.replicas.min(max_replicas);
if is_seeds_synced && is_replicas_synced {
term::success!(
&mut reporting.completion;
"Nothing to announce, already in sync with {replicas} node(s) (see `rad sync status`)"
);
return Ok(AnnounceResult::default());
}
all.iter()
.filter(|s| !s.is_synced() && &s.nid != profile.id())
.map(|s| s.nid)
.collect()
} else {
node.sessions()?
.into_iter()
.filter(|s| s.state.is_connected() && doc.is_visible_to(&s.nid))
.map(|s| s.nid)
.collect()
};
if unsynced.is_empty() {
term::info!(&mut reporting.completion; "No seeds to announce to for {rid}. (see `rad sync status`)");
return Ok(AnnounceResult::default());
}
settings.replicas = settings.replicas.min(unsynced.len());
let mut spinner = term::spinner_to(
format!("Found {} seed(s)..", unsynced.len()),
reporting.completion.clone(),
reporting.progress.clone(),
);
let result = node.announce(
rid,
unsynced,
settings.timeout,
|event, replicas| match event {
node::AnnounceEvent::Announced => ControlFlow::Continue(()),
node::AnnounceEvent::RefsSynced { remote, time } => {
spinner.message(format!(
"Synced with {} in {}..",
format::dim(remote),
format::dim(format!("{time:?}"))
));
if replicas.len() >= settings.replicas
&& (settings.seeds.is_empty()
|| settings.seeds.iter().any(|s| replicas.contains_key(s)))
{
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
}
},
)?;
if result.synced.is_empty() {
spinner.failed();
} else {
spinner.message(format!("Synced with {} node(s)", result.synced.len()));
spinner.finish();
if reporting.debug {
for (seed, time) in &result.synced {
writeln!(
&mut reporting.completion,
" {}",
term::format::dim(format!("Synced with {seed} in {time:?}")),
)
.ok();
}
}
}
for seed in &result.timed_out {
if settings.seeds.contains(seed) {
term::notice!(&mut reporting.completion; "Seed {seed} timed out..");
}
}
if result.synced.is_empty() {
return Err(SyncError::AllSeedsTimedOut);
}
Ok(result)
}