distrans_peer/peer/
mod.rsuse std::time::Duration;
use std::{future::Future, path::Path};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use tokio::sync::broadcast::Receiver;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use veilid_core::{CryptoKey, CryptoTyped, OperationId, Target, VeilidUpdate};
use distrans_fileindex::Index;
use crate::{proto::Header, Result};
pub type ShareKey = CryptoTyped<CryptoKey>;
pub trait Peer: Clone + Send {
fn subscribe_veilid_update(&self) -> Receiver<VeilidUpdate>;
fn reset(&mut self) -> impl Future<Output = Result<()>> + Send;
fn shutdown(self) -> impl Future<Output = Result<()>> + Send;
fn announce(
&mut self,
index: &Index,
) -> impl std::future::Future<Output = Result<(ShareKey, Target, Header)>> + Send;
fn reannounce_route(
&mut self,
key: &ShareKey,
prior_route: Option<Target>,
index: &Index,
header: &Header,
) -> impl std::future::Future<Output = Result<(Target, Header)>> + Send;
fn resolve(
&mut self,
key: &ShareKey,
root: &Path,
) -> impl std::future::Future<Output = Result<(Target, Header, Index)>> + Send;
fn reresolve_route(
&mut self,
key: &ShareKey,
prior_route: Option<Target>,
) -> impl Future<Output = Result<(Target, Header)>> + Send;
fn request_block(
&mut self,
target: Target,
piece: usize,
block: usize,
) -> impl Future<Output = Result<Vec<u8>>> + Send;
fn reply_block_contents(
&mut self,
call_id: OperationId,
contents: &[u8],
) -> impl std::future::Future<Output = Result<()>> + Send;
}
mod veilid;
pub use veilid::Veilid;
mod observable;
pub use observable::Observable;
pub use observable::State as PeerState;
pub async fn reset_with_backoff<T: Peer>(peer: &mut T, done: &CancellationToken) -> Result<()> {
let mut backoff = ExponentialBackoff::default();
loop {
match peer.reset().await {
Ok(()) => return Ok(()),
Err(e) => {
if !e.is_resetable() {
done.cancel();
return Err(e);
}
match backoff.next_backoff() {
Some(delay) => sleep(delay).await,
None => return Err(e),
}
}
}
}
}
pub fn retry_backoff() -> ExponentialBackoff {
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(Duration::from_secs(15));
backoff
}
pub fn reset_backoff() -> ExponentialBackoff {
ExponentialBackoff::default()
}
macro_rules! with_backoff_retry {
($op:expr) => {{
let mut retry_backoff = crate::retry_backoff();
let mut result = $op;
loop {
match result {
Ok(_) => break,
Err(ref e) => {
tracing::warn!(err = format!("{}", e));
match retry_backoff.next_backoff() {
Some(delay) => sleep(delay).await,
None => {
tracing::warn!("operation retries exceeded");
break;
}
};
result = $op;
}
}
}
result
}};
}
macro_rules! with_backoff_reset {
($peer:expr, $op:expr) => {{
let mut retry_backoff = crate::retry_backoff();
let mut reset_backoff = crate::reset_backoff();
let mut result = $op;
'retry: loop {
retry_backoff.reset();
'operation: loop {
match result {
Ok(_) => break 'retry,
Err(ref e) => {
tracing::warn!(err = format!("{}", e));
match retry_backoff.next_backoff() {
Some(delay) => sleep(delay).await,
None => {
break 'operation;
}
};
result = $op;
}
}
}
reset_backoff.reset();
'reset: loop {
match $peer.reset().await {
Ok(()) => break 'reset,
Err(ref e) => {
if !e.is_resetable() {
break 'retry;
}
tracing::warn!(err = format!("{}", e));
match reset_backoff.next_backoff() {
Some(delay) => sleep(delay).await,
None => {
break 'retry;
}
}
}
}
}
}
result
}};
}
pub(crate) use {with_backoff_retry, with_backoff_reset};