use futures::channel::oneshot;
use tor_cell::relaycell::msg::AnyRelayMsg;
use tor_cell::relaycell::RelayMsg;
use tor_error::internal;
use tor_proto::circuit::MetaCellDisposition;
use crate::FailedAttemptError;
pub(crate) struct Sender<M>(
Option<oneshot::Sender<Result<M, tor_proto::Error>>>,
);
pub(crate) struct Receiver<M>(
oneshot::Receiver<Result<M, tor_proto::Error>>, )
);
pub(crate) fn channel<M>() -> (Sender<M>, Receiver<M>) {
let (tx, rx) = oneshot::channel();
(Sender(Some(tx)), Receiver(rx))
}
impl<M> Sender<M> {
pub(crate) fn still_expected(&self) -> bool {
self.0.is_some()
}
pub(crate) fn deliver_expected_message(
&mut self,
msg: AnyRelayMsg,
disposition_on_success: MetaCellDisposition,
) -> Result<MetaCellDisposition, tor_proto::Error>
where
M: RelayMsg + Clone + TryFrom<AnyRelayMsg, Error = tor_cell::Error>,
{
let reply_tx = self
.0
.take()
.ok_or_else(|| internal!("Tried to handle two messages of the same type"))?;
let outcome = M::try_from(msg).map_err(|err| tor_proto::Error::CellDecodeErr {
object: "rendezvous-related cell",
err,
});
#[allow(clippy::unnecessary_lazy_evaluations)] reply_tx
.send(outcome.clone())
.unwrap_or_else(|_: Result<M, _>| ());
outcome.map(|_| disposition_on_success)
}
}
impl<M> Receiver<M> {
pub(crate) async fn recv(
self,
handle_proto_error: impl Fn(tor_proto::Error) -> FailedAttemptError + Copy,
) -> Result<M, FailedAttemptError> {
self.0
.await
.map_err(|_: oneshot::Canceled| tor_proto::Error::CircuitClosed)
.map_err(handle_proto_error)?
.map_err(handle_proto_error)
}
}