use std::{path::PathBuf, time::Duration};
use anyhow::{Error, Result};
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
use stigmerge_peer::{
fetcher::{self},
is_cancelled, is_lagged, new_connection, CancelError,
};
use tokio::{fs::File, io::AsyncWriteExt, select, spawn, sync::broadcast, task::JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use veilnet::{connection::StateAttachmentWatcher, Connection};
use crate::{
cli::Commands,
info::share_info,
initialize_stdout_logging, initialize_ui_logging,
share::{Event, Share},
Cli,
};
pub struct App {
cli: Cli,
multi_progress: MultiProgress,
}
impl App {
pub fn new(cli: Cli) -> Result<App> {
let no_ui = cli.no_ui();
Ok(App {
cli,
multi_progress: MultiProgress::with_draw_target(if no_ui {
ProgressDrawTarget::hidden()
} else {
ProgressDrawTarget::stderr()
}),
})
}
pub async fn run(&mut self) -> Result<()> {
self.multi_progress
.println(format!("🐝 stigmerge {}", env!("CARGO_PKG_VERSION")))?;
if self.cli.version() {
return Ok(());
}
if self.cli.no_ui() {
initialize_stdout_logging()
} else {
initialize_ui_logging(self.multi_progress.clone());
}
let state_dir = self.cli.state_dir()?;
debug!(state_dir);
let conn = new_connection(state_dir.to_string().as_str(), None).await?;
let cancel = CancellationToken::new();
let ctrl_c_handler = {
let cancel = cancel.clone();
let conn = conn.clone();
spawn(async move {
select! {
_ = cancel.cancelled() => {}
_ = tokio::signal::ctrl_c() => {
info!("Received ctrl-c, shutting down...");
}
}
cancel.cancel();
conn.close().await?;
trace!("Ctrl-C handler completed");
Ok::<(), Error>(())
})
};
if let Commands::Info { share_key, path } = &self.cli.commands {
share_info(cancel, conn, share_key, path).await?;
return Ok(());
}
let res = self.run_with_connection(cancel, conn.clone()).await;
trace!(?res, "run_with_connection completed");
if let Err(err) = ctrl_c_handler.await {
warn!(?err);
}
if let Err(err) = res {
if !is_cancelled(&err) {
error!(?err);
} else {
error!(%err);
}
return Err(err);
}
Ok(())
}
async fn run_with_connection<C: veilnet::Connection + Clone + Send + Sync + 'static>(
&self,
cancel: CancellationToken,
conn: C,
) -> Result<()> {
let conn_progress_bar = self.multi_progress.add(ProgressBar::new_spinner());
conn_progress_bar.set_message("Connected to Veilid network");
conn_progress_bar.set_prefix("📶");
conn_progress_bar.disable_steady_tick();
conn_progress_bar.set_style(ProgressStyle::with_template("{prefix} {msg}")?);
let (attachment_watcher, mut attachment_rx) = StateAttachmentWatcher::new();
conn.add_update_handler(Box::new(attachment_watcher));
let share_mode = self.cli.commands.share_args()?;
let mut share = Share::new(conn, share_mode)?;
{
let cancel = cancel.clone();
share.tasks.spawn(async move {
loop {
select! {
_ = cancel.cancelled() => {
return Err(CancelError.into());
}
res = attachment_rx.changed() => {
res?;
let state = attachment_rx.borrow_and_update();
if state.public_internet_ready {
conn_progress_bar.disable_steady_tick();
conn_progress_bar.set_style(ProgressStyle::with_template("{prefix} {msg}")?);
conn_progress_bar.set_message("Connected to Veilid network");
} else {
conn_progress_bar.enable_steady_tick(Duration::from_millis(100));
conn_progress_bar.set_style(ProgressStyle::with_template("{spinner} {msg}")?);
conn_progress_bar.set_message("Disconnected from Veilid network");
}
}
}
}
});
}
{
let events_rx = share.subscribe_events();
self.add_state_file_handler(&cancel, &mut share.tasks, events_rx)?;
}
{
let events_rx = share.subscribe_events();
self.add_fetch_progress(&cancel, &mut share.tasks, events_rx)?;
}
{
let events_rx = share.subscribe_events();
self.add_seed_indexer_progress(&cancel, &mut share.tasks, events_rx)?;
}
share.start(cancel.clone()).await?;
share.join().await?;
Ok(())
}
fn add_fetch_progress(
&self,
cancel: &CancellationToken,
tasks: &mut JoinSet<Result<()>>,
mut events: broadcast::Receiver<Event>,
) -> Result<()> {
let fetch_progress = self.multi_progress.add(ProgressBar::new_spinner());
fetch_progress.set_style(ProgressStyle::with_template(
"{msg} {wide_bar} {binary_bytes}/{binary_total_bytes}",
)?);
let verify_progress = self.multi_progress.add(ProgressBar::new_spinner());
verify_progress.set_style(ProgressStyle::with_template("{msg} {bar:40} {pos}/{len}")?);
let progress_cancel = cancel.clone();
tasks.spawn(async move {
loop {
select! {
_ = progress_cancel.cancelled() => {
return Err(CancelError.into());
}
res = events.recv() => {
if is_lagged(&res) {
warn!("events channel lagged in fetch progress handler");
continue;
}
let fetcher_status = match res? {
Event::FetcherStatus(status) => status,
_ => continue,
};
match fetcher_status {
fetcher::Status::IndexProgress { position, length } => {
fetch_progress.set_message("Indexing");
fetch_progress.set_position(position);
fetch_progress.set_length(length);
}
fetcher::Status::DigestProgress { position, length } => {
fetch_progress.set_message("Comparing");
fetch_progress.set_position(position);
fetch_progress.set_length(length);
}
fetcher::Status::FetchProgress { fetch_position, fetch_length, verify_position, verify_length } => {
fetch_progress.set_message("Fetching ");
fetch_progress.set_position(if fetch_position > 0 { fetch_position.try_into().unwrap() } else { 0 });
fetch_progress.set_length(fetch_length);
verify_progress.set_message("Verifying");
verify_progress.set_position(verify_position);
verify_progress.set_length(verify_length);
}
fetcher::Status::Done => {
fetch_progress.finish_with_message("Fetch complete");
verify_progress.finish_and_clear();
return Ok(());
}
_ => {}
}
}
}
}
});
Ok(())
}
fn add_seed_indexer_progress(
&self,
cancel: &CancellationToken,
tasks: &mut JoinSet<Result<()>>,
mut events: broadcast::Receiver<Event>,
) -> Result<()> {
let indexer_progress = self.multi_progress.add(ProgressBar::new_spinner());
indexer_progress.set_style(ProgressStyle::with_template(
"{wide_bar} {binary_bytes}/{binary_total_bytes}",
)?);
let verifier_progress = self.multi_progress.add(ProgressBar::new_spinner());
verifier_progress.set_style(ProgressStyle::with_template(
"{wide_bar} {binary_bytes}/{binary_total_bytes}",
)?);
let progress_cancel = cancel.clone();
tasks.spawn(async move {
loop {
select! {
_ = progress_cancel.cancelled() => {
return Err(CancelError.into());
}
res = events.recv() => {
if is_lagged(&res) {
warn!("events channel lagged in seed indexer progress handler");
continue;
}
let (index_progress, verify_progress) = match res? {
Event::SeederLoading{ index_progress, verify_progress } => {
(index_progress, verify_progress)
}
_ => continue,
};
if index_progress.length == index_progress.position {
indexer_progress.finish_and_clear();
} else {
indexer_progress.set_message("Indexing");
indexer_progress.set_length(index_progress.length);
indexer_progress.set_position(index_progress.position);
}
if verify_progress.length == verify_progress.position {
verifier_progress.finish_and_clear();
return Ok(())
}
verifier_progress.set_message("Verifying");
verifier_progress.set_length(verify_progress.length);
verifier_progress.set_position(verify_progress.position);
}
}
}
});
Ok(())
}
fn add_state_file_handler(
&self,
cancel: &CancellationToken,
tasks: &mut JoinSet<std::result::Result<(), Error>>,
mut events: broadcast::Receiver<Event>,
) -> Result<()> {
let handler_cancel = cancel.clone();
let state_dir = self.cli.state_dir()?;
let seed_progress = self.multi_progress.add(ProgressBar::new_spinner());
tasks.spawn(async move {
loop {
select! {
_ = handler_cancel.cancelled() => {
return Err(CancelError.into());
}
res = events.recv() => {
if is_lagged(&res) {
warn!("events channel lagged in state file handler");
continue;
}
let share_info = match res? {
Event::ShareInfo(share_info) => share_info,
_ => continue,
};
Self::write_state_file(&state_dir, "index_digest",
hex::encode(share_info.want_index_digest)).await?;
Self::write_state_file(&state_dir, "share_key",
share_info.key.to_string()).await?;
seed_progress.set_style(ProgressStyle::with_template("{prefix} {msg}")?);
seed_progress.set_prefix("🌱");
seed_progress.set_message(format!(
"Seeding {}{} to {}",
share_info.want_index
.files()
.first()
.map(|f| f.path().to_string_lossy())
.unwrap(),
if share_info.want_index.files().len() > 1 { "..." } else { "" },
share_info.key
));
}
}
}
});
Ok(())
}
async fn write_state_file(state_dir: &str, key: &str, value: String) -> Result<()> {
let state_file = PathBuf::from(state_dir).join(key);
File::create(state_file)
.await?
.write_all(value.as_bytes())
.await?;
Ok(())
}
}