use crate::util::timeout;
use super::*;
pub async fn cancellable<T>(
future: impl Future<Output = T> + Unpin,
cancel: impl Future<Output = ()>,
) -> Result<T, Cancelled> {
use futures::future::Either;
futures::pin_mut!(cancel);
match futures::future::select(cancel, future).await {
Either::Left(((), _)) => Err(Cancelled),
Either::Right((val, _)) => Ok(val),
}
}
pub async fn cancellable_2<T, C: Future<Output = ()> + Unpin>(
future: impl Future<Output = T> + Unpin,
cancel: C,
) -> Result<(T, C), Cancelled> {
use futures::future::Either;
match futures::future::select(cancel, future).await {
Either::Left(((), _)) => Err(Cancelled),
Either::Right((val, cancel)) => Ok((val, cancel)),
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Cancelled;
impl std::fmt::Display for Cancelled {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Task has been cancelled")
}
}
const SHUTDOWN_TIME: std::time::Duration = std::time::Duration::from_secs(5);
macro_rules! with_cancel_wormhole {
($wormhole:ident, run = $run:expr_2021, $cancel:expr_2021, ret_cancel = $ret_cancel:expr_2021 $(,)?) => {{
let run = Box::pin($run);
let result = cancel::cancellable_2(run, $cancel).await;
let Some((transit, wormhole, cancel)) =
cancel::handle_run_result_noclose($wormhole, result).await?
else {
return Ok($ret_cancel);
};
(transit, wormhole, cancel)
}};
}
#[cfg(feature = "experimental-transfer-v2")]
pub(super) use with_cancel_wormhole;
#[rustfmt::skip]
macro_rules! with_cancel_transit {
($transit:ident, run = $run:expr_2021, $cancel:expr_2021, $make_error_message:expr_2021, $parse_message:expr_2021, ret_cancel = $ret_cancel:expr_2021 $(,)?) => {{
let run = Box::pin($run);
let result = cancel::cancellable_2(run, $cancel).await;
let Some((value, transit)) = cancel::handle_run_result_transit(
$transit,
result,
$make_error_message,
$parse_message,
).await? else { return Ok($ret_cancel); };
(value, transit)
}};
}
#[cfg(feature = "experimental-transfer-v2")]
pub(super) use with_cancel_transit;
async fn wrap_timeout(run: impl Future<Output = ()>, cancel: impl Future<Output = ()>) {
let run = timeout(SHUTDOWN_TIME, run);
futures::pin_mut!(run);
match cancellable(run, cancel).await {
Ok(Ok(())) => {},
Ok(Err(_timeout)) => tracing::debug!("Post-transfer timed out"),
Err(_cancelled) => tracing::debug!("Post-transfer got cancelled by user"),
};
}
fn debug_err(result: Result<(), impl std::fmt::Display>, operation: &str) {
if let Err(error) = result {
tracing::debug!("Failed to {} after transfer: {}", operation, error);
}
}
pub async fn handle_run_result(
wormhole: Wormhole,
result: Result<(Result<(), TransferError>, impl Future<Output = ()>), Cancelled>,
) -> Result<(), TransferError> {
match handle_run_result_noclose(wormhole, result).await {
Ok(Some(((), wormhole, cancel))) => {
tracing::debug!("Transfer done, doing cleanup logic");
wrap_timeout(
async {
debug_err(wormhole.close().await, "close Wormhole");
},
cancel,
)
.await;
Ok(())
},
Ok(None) => Ok(()),
Err(e) => Err(e),
}
}
pub async fn handle_run_result_noclose<T, C: Future<Output = ()>>(
mut wormhole: Wormhole,
result: Result<(Result<T, TransferError>, C), Cancelled>,
) -> Result<Option<(T, Wormhole, C)>, TransferError> {
match result {
Ok((Ok(val), cancel)) => Ok(Some((val, wormhole, cancel))),
Ok((Err(error @ TransferError::PeerError(_)), cancel)) => {
tracing::debug!(
"Transfer encountered an error ({}), doing cleanup logic",
error
);
wrap_timeout(
async {
debug_err(wormhole.close().await, "close Wormhole");
},
cancel,
)
.await;
Err(error)
},
Ok((Err(mut error @ TransferError::Transit(_)), cancel)) => {
tracing::debug!(
"Transfer encountered an error ({}), doing cleanup logic",
error
);
wrap_timeout(async {
match timeout(SHUTDOWN_TIME / 3, wormhole.receive_json()).await { Ok(Ok(Ok(PeerMessage::Error(e)))) => {
error = TransferError::PeerError(e);
} _ => {
tracing::debug!("Failed to retrieve more specific error message from peer. Maybe it crashed?");
}}
debug_err(wormhole.close().await, "close Wormhole");
}, cancel).await;
Err(error)
},
Ok((Err(error), cancel)) => {
tracing::debug!(
"Transfer encountered an error ({}), doing cleanup logic",
error
);
wrap_timeout(
async {
debug_err(
wormhole
.send_json(&PeerMessage::Error(format!("{error}")))
.await,
"notify peer about the error",
);
debug_err(wormhole.close().await, "close Wormhole");
},
cancel,
)
.await;
Err(error)
},
Err(cancelled) => {
tracing::debug!("Transfer got cancelled, doing cleanup logic");
wrap_timeout(
async {
debug_err(
wormhole
.send_json(&PeerMessage::Error(format!("{cancelled}")))
.await,
"notify peer about our cancellation",
);
debug_err(wormhole.close().await, "close Wormhole");
},
futures::future::pending(),
)
.await;
Ok(None)
},
}
}
#[cfg(feature = "experimental-transfer-v2")]
pub async fn handle_run_result_transit<T>(
mut transit: transit::Transit,
result: Result<(Result<T, TransferError>, impl Future<Output = ()>), Cancelled>,
make_error_message: impl FnOnce(&(dyn std::string::ToString + Sync)) -> Vec<u8>,
parse_message: impl Fn(&[u8]) -> Result<Option<String>, TransferError>,
) -> Result<Option<(T, transit::Transit)>, TransferError> {
match result {
Ok((Ok(val), _cancel)) => Ok(Some((val, transit))),
Ok((Err(error @ TransferError::PeerError(_)), _cancel)) => {
tracing::debug!(
"Transfer encountered an error ({}), doing cleanup logic",
error
);
Err(error)
},
Ok((Err(mut error @ TransferError::Transit(_)), cancel)) => {
tracing::debug!(
"Transfer encountered an error ({}), doing cleanup logic",
error
);
wrap_timeout(
async {
loop {
let Ok(msg) = transit.receive_record().await else {
break;
};
match parse_message(&msg) {
Ok(None) => continue,
Ok(Some(err)) => {
error = TransferError::PeerError(err);
break;
},
Err(_) => break,
}
}
},
cancel,
)
.await;
Err(error)
},
Ok((Err(error), cancel)) => {
tracing::debug!(
"Transfer encountered an error ({}), doing cleanup logic",
error
);
wrap_timeout(
async {
debug_err(
transit.send_record(&make_error_message(&error)).await,
"notify peer about the error",
);
},
cancel,
)
.await;
Err(error)
},
Err(cancelled) => {
tracing::debug!("Transfer got cancelled, doing cleanup logic");
wrap_timeout(
async {
debug_err(
transit.send_record(&make_error_message(&cancelled)).await,
"notify peer about our cancellation",
);
},
futures::future::pending(),
)
.await;
Ok(None)
},
}
}