use std::{ops::Deref, path::PathBuf, sync::Arc};
use anyhow::{Error, Result};
use path_absolutize::Absolutize;
use stigmerge_fileindex::{Indexer, Progress};
use stigmerge_peer::{
content_addressable::ContentAddressable,
fetcher,
peer_gossip::PeerGossip,
piece_verifier,
proto::Digest,
record::{StableHaveMap, StableShareRecord},
seeder, share_announcer, share_resolver,
types::{LocalShareInfo, RemoteShareInfo},
CancelError, Retry,
};
use tokio::{
select,
sync::{broadcast, watch, RwLock},
task::JoinSet,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};
use veilid_core::RecordKey;
use veilnet::{
connection::{RoutingContext, API},
Connection,
};
#[derive(Debug)]
pub enum Mode {
Seed {
path: PathBuf,
},
Fetch {
root: PathBuf,
want_index_digest: Option<Digest>,
share_keys: Vec<RecordKey>,
},
}
impl Mode {
pub fn share_keys(&self) -> &[RecordKey] {
match self {
Mode::Seed { .. } => &[],
Mode::Fetch { share_keys, .. } => share_keys,
}
}
}
#[derive(Clone, Debug)]
pub enum Event {
ShareInfo(Box<LocalShareInfo>),
FetcherStatus(fetcher::Status),
SeederLoading {
index_progress: stigmerge_fileindex::Progress,
verify_progress: stigmerge_fileindex::Progress,
},
SeederAvailable,
}
pub struct Share<C: Connection> {
conn: C,
mode: Mode,
retry: Retry,
events_tx: broadcast::Sender<Event>,
events_rx: broadcast::Receiver<Event>,
pub(crate) tasks: JoinSet<Result<()>>,
}
const SHARE_EVENTS_CAPACITY: usize = 131072;
impl<C: Connection + Clone + Send + Sync + 'static> Share<C> {
pub fn new(conn: C, mode: Mode) -> Result<Self> {
let (events_tx, events_rx) = broadcast::channel(SHARE_EVENTS_CAPACITY);
let share = Self {
conn,
mode,
retry: Retry::default(),
events_tx,
events_rx,
tasks: JoinSet::new(),
};
Ok(share)
}
pub fn subscribe_events(&self) -> broadcast::Receiver<Event> {
self.events_rx.resubscribe()
}
pub async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let root = match self.mode {
Mode::Seed { ref path } => path
.absolutize()?
.parent()
.ok_or(Error::msg("cannot determine parent directory"))?
.to_path_buf(),
Mode::Fetch { ref root, .. } => root.to_owned(),
};
let (share_resolver, resolver_task) = share_resolver::ShareResolver::new_task(
cancel.clone(),
self.retry.clone(),
self.conn.clone(),
&root,
);
self.tasks.spawn(async move {
resolver_task.await??;
Ok(())
});
let mut want_index = None;
let mut remote_shares = vec![];
match &self.mode {
Mode::Fetch {
share_keys,
want_index_digest,
..
} => {
for share_key in share_keys.iter() {
let want_index_digest: Option<[u8; 32]> = match want_index_digest {
Some(digest_string) => {
let digest = hex::decode(digest_string)?;
Some(
digest
.try_into()
.map_err(|_| Error::msg("Invalid digest length"))?,
)
}
None => None,
};
let (_, header) =
StableShareRecord::new_remote(&mut self.conn, share_key).await?;
let mut index =
StableShareRecord::read_index(&mut self.conn, share_key, &header, &root)
.await?;
let remote_index_digest = index.digest()?;
if let Some(want_digest) = want_index_digest {
if remote_index_digest != want_digest {
warn!(
"remote share does not match wanted index digest: expected {}, got {}",
hex::encode(&want_digest[..]),
hex::encode(&remote_index_digest[..])
);
continue;
}
}
if let Err(err) = async {
let route_id = self
.conn
.routing_context()
.api()
.import_remote_private_route(header.route_data().to_vec())?;
let have_map = StableHaveMap::read_remote(
&mut self.conn,
header.have_map().unwrap().key(),
&index,
)
.await?;
remote_shares.push(RemoteShareInfo {
key: share_key.clone(),
header,
index: index.clone(),
index_digest: remote_index_digest,
route_id,
have_map,
});
Ok::<(), anyhow::Error>(())
}
.await
{
warn!(?err, ?share_key, "failed to resolve share");
continue;
}
want_index.get_or_insert(index);
}
}
Mode::Seed { path } => {
let indexer = Indexer::from_file(path).await?;
self.tasks.spawn(Self::send_indexer_progress(
cancel.clone(),
indexer.subscribe_index_progress(),
indexer.subscribe_digest_progress(),
self.events_tx.clone(),
));
let index = indexer.index().await?;
want_index.get_or_insert(index);
}
};
let index = want_index.ok_or(Error::msg("failed to resolve index"))?;
let share = {
let share_announcer = share_announcer::ShareAnnouncer::new(
cancel.clone(),
self.retry.clone(),
self.conn.clone(),
index.clone(),
)
.await?;
let share_info = share_announcer.share_info().await;
debug!(key = ?share_info.key, index_digest = hex::encode(share_info.want_index_digest));
{
let cancel = cancel.clone();
self.tasks.spawn(async move {
let res = share_announcer.run().await;
cancel.cancel();
res
});
}
self.events_tx
.send(Event::ShareInfo(Box::new(share_info.clone())))?;
share_info
};
let peer_gossip =
PeerGossip::new(self.conn.clone(), share.clone(), share_resolver.clone()).await?;
{
let retry = self.retry.clone();
let cancel = cancel.clone();
self.tasks.spawn(async move {
peer_gossip.run(cancel, retry).await.map_err(|err| {
error!(?err, "peer gossip task");
err
})
});
}
let shared_index = Arc::new(RwLock::new(index.clone()));
let piece_verifier = piece_verifier::PieceVerifier::new(shared_index.clone()).await;
let seeder =
seeder::Seeder::new(self.conn.clone(), share.clone(), piece_verifier.clone()).await?;
self.tasks
.spawn(seeder.run(cancel.clone(), self.retry.clone()));
match &self.mode {
Mode::Seed { .. } => {
for (piece_index, piece) in index.payload().pieces().iter().enumerate() {
for block_index in 0..piece.block_count() {
let piece_state = stigmerge_peer::types::PieceState::new(
0,
piece_index,
0,
piece.block_count(),
block_index,
);
piece_verifier.update_piece(piece_state).await?;
}
}
self.events_tx.send(Event::SeederAvailable)?;
}
Mode::Fetch { .. } => {
if remote_shares.is_empty() {
anyhow::bail!("failed to resolve initial shares");
}
let fetcher = fetcher::Fetcher::new(
self.conn.clone(),
share.clone(),
piece_verifier,
share_resolver.clone(),
remote_shares,
)
.await;
for share_key in self.mode.share_keys() {
share_resolver.add_share(share_key).await?;
}
self.tasks.spawn(Self::send_fetch_progress(
cancel.clone(),
fetcher.subscribe_fetcher_status(),
self.events_tx.clone(),
));
self.tasks
.spawn(fetcher.run(cancel.clone(), self.retry.clone()));
}
}
Ok(())
}
pub async fn join(self) -> Result<()> {
self.tasks
.join_all()
.await
.into_iter()
.collect::<Result<(), _>>()
}
async fn send_indexer_progress(
cancel: CancellationToken,
mut subscribe_index_progress: watch::Receiver<stigmerge_fileindex::Progress>,
mut subscribe_digest_progress: watch::Receiver<stigmerge_fileindex::Progress>,
events_tx: broadcast::Sender<Event>,
) -> Result<()> {
let mut index_progress = Progress::default();
let mut verify_progress = Progress::default();
loop {
select! {
_ = cancel.cancelled() => {
return Err(CancelError.into());
}
res = subscribe_index_progress.changed() => {
res?;
let progress = subscribe_index_progress.borrow_and_update();
progress.clone_into(&mut index_progress);
events_tx.send(Event::SeederLoading{
index_progress,
verify_progress,
})?;
}
res = subscribe_digest_progress.changed() => {
res?;
let progress = subscribe_digest_progress.borrow_and_update();
progress.clone_into(&mut verify_progress);
events_tx.send(Event::SeederLoading{
index_progress,
verify_progress,
})?;
if progress.length == progress.position {
return Ok(());
}
}
}
}
}
async fn send_fetch_progress(
cancel: CancellationToken,
mut subscribe_fetcher_status: watch::Receiver<fetcher::Status>,
events_tx: broadcast::Sender<Event>,
) -> Result<()> {
loop {
select! {
_ = cancel.cancelled() => {
return Err(CancelError.into());
}
res = subscribe_fetcher_status.changed() => {
res?;
let progress = subscribe_fetcher_status.borrow_and_update();
events_tx.send(Event::FetcherStatus(progress.clone()))?;
if let &fetcher::Status::Done = progress.deref() {
return Ok(());
}
}
}
}
}
}