#![allow(unstable_name_collisions)]
#[cfg(feature = "conflux")]
pub(crate) mod msghandler;
use std::pin::Pin;
use std::sync::atomic::{self, AtomicU64};
use std::sync::{Arc, Mutex};
use futures::{FutureExt as _, StreamExt, select_biased};
use itertools::Itertools;
use itertools::structs::ExactlyOneError;
use smallvec::{SmallVec, smallvec};
use tor_rtcompat::{SleepProvider as _, SleepProviderExt as _};
use tracing::{info, instrument, trace, warn};
use tor_cell::relaycell::AnyRelayMsgOuter;
use tor_error::{Bug, bad_api_usage, internal};
use tor_linkspec::HasRelayIds as _;
use crate::circuit::UniqId;
use crate::circuit::circhop::SendRelayCell;
use crate::client::circuit::TunnelMutableState;
#[cfg(feature = "circ-padding")]
use crate::client::circuit::padding::PaddingEvent;
use crate::client::circuit::path::HopDetail;
use crate::conflux::cmd_counts_towards_seqno;
use crate::conflux::msghandler::{ConfluxStatus, RemoveLegReason};
use crate::congestion::params::CongestionWindowParams;
use crate::crypto::cell::HopNum;
use crate::streammap;
use crate::tunnel::TunnelId;
use crate::util::err::ReactorError;
use crate::util::poll_all::PollAll;
use crate::util::tunnel_activity::TunnelActivity;
use super::circuit::CircHop;
use super::{Circuit, CircuitEvent};
#[cfg(feature = "conflux")]
use {
crate::conflux::msghandler::ConfluxMsgHandler,
msghandler::ClientConfluxMsgHandler,
tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
};
const MAX_CONFLUX_LEGS: usize = 16;
const NUM_CIRC_FUTURES: usize = 2;
const CIRC_EVENT_COUNT: usize = MAX_CONFLUX_LEGS * NUM_CIRC_FUTURES;
pub(super) struct ConfluxSet {
tunnel_id: TunnelId,
legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
mutable: Arc<TunnelMutableState>,
primary_id: UniqId,
join_point: Option<JoinPoint>,
#[cfg(feature = "conflux")]
nonce: V1Nonce,
#[cfg(feature = "conflux")]
desired_ux: V1DesiredUx,
last_seq_delivered: Arc<AtomicU64>,
selected_init_primary: bool,
}
#[derive(Clone, derive_more::Debug)]
struct JoinPoint {
hop: HopNum,
detail: HopDetail,
#[debug(skip)]
streams: Arc<Mutex<streammap::StreamMap>>,
}
impl ConfluxSet {
pub(super) fn new(
tunnel_id: TunnelId,
circuit_leg: Circuit,
) -> (Self, Arc<TunnelMutableState>) {
let primary_id = circuit_leg.unique_id();
let circ_mutable = Arc::clone(circuit_leg.mutable());
let legs = smallvec![circuit_leg];
let join_point = None;
#[cfg(feature = "conflux")]
let desired_ux = V1DesiredUx::NO_OPINION;
let mutable = Arc::new(TunnelMutableState::default());
mutable.insert(primary_id, circ_mutable);
let set = Self {
tunnel_id,
legs,
primary_id,
join_point,
mutable: mutable.clone(),
#[cfg(feature = "conflux")]
nonce: V1Nonce::new(&mut rand::rng()),
#[cfg(feature = "conflux")]
desired_ux,
last_seq_delivered: Arc::new(AtomicU64::new(0)),
selected_init_primary: false,
};
(set, mutable)
}
pub(super) fn take_single_leg(&mut self) -> Result<Circuit, Bug> {
let circ = self
.legs
.iter()
.exactly_one()
.map_err(NotSingleLegError::from)?;
let circ_id = circ.unique_id();
debug_assert!(circ_id == self.primary_id);
self.remove_unchecked(circ_id)
}
pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
Ok(self.legs.iter().exactly_one()?)
}
pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
Ok(self.legs.iter_mut().exactly_one()?)
}
pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
#[cfg(not(feature = "conflux"))]
if self.legs.len() > 1 {
return Err(internal!(
"got multipath tunnel, but conflux feature is disabled?!"
));
}
if self.legs.is_empty() {
Err(bad_api_usage!(
"tried to get circuit leg before creating it?!"
))
} else {
let circ = self
.leg_mut(self.primary_id)
.ok_or_else(|| internal!("conflux set is empty?!"))?;
Ok(circ)
}
}
pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
self.legs.iter().find(|circ| circ.unique_id() == leg_id)
}
pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
}
pub(super) fn len(&self) -> usize {
self.legs.len()
}
pub(super) fn is_empty(&self) -> bool {
self.legs.len() == 0
}
#[instrument(level = "trace", skip_all)]
pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
let circ = self.remove_unchecked(leg)?;
tracing::trace!(
circ_id = %circ.unique_id(),
"Circuit removed from conflux set"
);
self.mutable.remove(circ.unique_id());
if self.legs.is_empty() {
tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
return Err(ReactorError::Shutdown);
}
if leg == self.primary_id {
return Err(ReactorError::Shutdown);
}
cfg_if::cfg_if! {
if #[cfg(feature = "conflux")] {
self.remove_conflux(circ)
} else {
Err(internal!("Multiple legs in single-path tunnel?!").into())
}
}
}
#[cfg(feature = "conflux")]
fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
let Some(status) = circ.conflux_status() else {
return Err(internal!("Found non-conflux circuit in conflux set?!").into());
};
match status {
ConfluxStatus::Unlinked => {
Ok(circ)
}
ConfluxStatus::Pending | ConfluxStatus::Linked => {
let (circ_last_seq_recv, circ_last_seq_sent) =
(|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
if circ_last_seq_recv > max_last_seq_recv {
return Err(ReactorError::Shutdown);
}
}
if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
if circ_last_seq_sent > max_last_seq_sent {
return Err(ReactorError::Shutdown);
}
}
let hop = self.join_point_hop(&circ)?;
let (inflight, cwnd) = (|| {
let ccontrol = hop.ccontrol();
let inflight = ccontrol.inflight()?;
let cwnd = ccontrol.cwnd()?;
Some((inflight, cwnd))
})()
.ok_or_else(|| {
internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
})?;
if inflight >= cwnd.params().sendme_inc() {
return Err(ReactorError::Shutdown);
}
Ok(circ)
}
}
}
#[cfg(feature = "conflux")]
fn max_last_seq_recv(&self) -> Option<u64> {
self.legs
.iter()
.filter_map(|leg| leg.last_seq_recv().ok())
.max()
}
#[cfg(feature = "conflux")]
fn max_last_seq_sent(&self) -> Option<u64> {
self.legs
.iter()
.filter_map(|leg| leg.last_seq_sent().ok())
.max()
}
fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
return Err(internal!("No join point on conflux tunnel?!"));
};
circ.hop(join_point)
.ok_or_else(|| internal!("Conflux join point disappeared?!"))
}
fn circuits(&self) -> impl Iterator<Item = &Circuit> {
self.legs.iter()
}
pub(super) fn tunnel_activity(&self) -> TunnelActivity {
self.circuits()
.map(|c| c.hops.tunnel_activity())
.max()
.unwrap_or_else(TunnelActivity::never_used)
}
#[cfg(feature = "conflux")]
pub(super) fn add_legs(
&mut self,
legs: Vec<Circuit>,
runtime: &tor_rtcompat::DynTimeProvider,
) -> Result<(), Bug> {
if legs.is_empty() {
return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
}
let join_point = match self.join_point.take() {
Some(p) => {
p
}
None => {
let (hop, detail, streams) = (|| {
let first_leg = self.circuits().next()?;
let first_leg_path = first_leg.path();
let all_hops = first_leg_path.all_hops();
let hop_num = first_leg.last_hop_num()?;
let detail = all_hops.last()?;
let hop = first_leg.hop(hop_num)?;
let streams = Arc::clone(hop.stream_map());
Some((hop_num, detail.clone(), streams))
})()
.ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
JoinPoint {
hop,
detail,
streams,
}
}
};
let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
match (h1, h2) {
(HopDetail::Relay(t1), HopDetail::Relay(t2)) => Ok(t1.same_relay_ids(t2)),
#[cfg(feature = "hs-common")]
(HopDetail::Virtual, HopDetail::Virtual) => {
Err(internal!("onion service conflux not supported"))
}
_ => Ok(false),
}
};
let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
use crate::ccparams::Algorithm;
let path = leg.path();
let Some(last_hop) = path.all_hops().last() else {
return Ok(false);
};
let Some(last_hop_num) = leg.last_hop_num() else {
return Ok(false);
};
let circhop = leg
.hop(last_hop_num)
.ok_or_else(|| internal!("hop disappeared?!"))?;
let is_cc_suitable = match circhop.ccontrol().algorithm() {
Algorithm::FixedWindow(_) => false,
Algorithm::Vegas(_) => true,
};
if !is_cc_suitable {
return Ok(false);
}
Ok(last_hop_num == join_point.hop
&& hops_eq(last_hop, &join_point.detail)?
&& !leg.has_streams()
&& leg.conflux_status().is_none())
};
for leg in &legs {
if !leg_is_valid(leg)? {
return Err(bad_api_usage!("one more conflux circuits are invalid"));
}
}
self.join_point = Some(join_point.clone());
for circ in legs {
let mutable = Arc::clone(circ.mutable());
let unique_id = circ.unique_id();
self.legs.push(circ);
self.mutable.insert(unique_id, mutable);
}
let cwnd_params = self.cwnd_params()?;
for circ in self.legs.iter_mut() {
if circ.conflux_status().is_none() {
let handler = Box::new(ClientConfluxMsgHandler::new(
join_point.hop,
self.nonce,
Arc::clone(&self.last_seq_delivered),
cwnd_params,
runtime.clone(),
));
let conflux_handler =
ConfluxMsgHandler::new(handler, Arc::clone(&self.last_seq_delivered));
circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
let last_hop = circ
.hop_mut(join_point.hop)
.ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
}
}
Ok(())
}
#[cfg(feature = "conflux")]
fn cwnd_params(&self) -> Result<CongestionWindowParams, Bug> {
let primary_leg = self
.leg(self.primary_id)
.ok_or_else(|| internal!("no primary leg?!"))?;
let join_point = self.join_point_hop(primary_leg)?;
let ccontrol = join_point.ccontrol();
let cwnd = ccontrol
.cwnd()
.ok_or_else(|| internal!("congestion control algorithm does not track the cwnd?!"))?;
Ok(*cwnd.params())
}
#[cfg(feature = "conflux")]
pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
use tor_error::into_internal;
let Some(join_point) = self.join_point.as_ref() else {
return Ok(None);
};
let join_point = join_point.hop;
if !self.should_update_primary_leg() {
return Ok(None);
}
let Some(new_primary_id) = self.select_primary_leg()? else {
return Ok(None);
};
if self.primary_id == new_primary_id {
return Ok(None);
}
let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
self.primary_id = new_primary_id;
let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
into_internal!("Seqno delta for switch does not fit in u32?!"),
)?;
self.primary_leg_mut()?
.set_last_seq_sent(prev_last_seq_sent)?;
let switch = ConfluxSwitch::new(seqno_delta);
let cell = AnyRelayMsgOuter::new(None, switch.into());
Ok(Some(SendRelayCell {
hop: Some(join_point),
early: false,
cell,
}))
}
#[cfg(feature = "conflux")]
fn should_update_primary_leg(&mut self) -> bool {
if !self.selected_init_primary {
self.maybe_select_init_primary();
return false;
}
if self.legs.len() < 2 {
return false;
}
true
}
#[cfg(feature = "conflux")]
fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
match self.desired_ux {
V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
self.select_primary_leg_min_rtt(false)
}
V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
self.select_primary_leg_min_rtt(false)
}
_ => {
warn!(
tunnel_id = %self.tunnel_id,
"Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
self.desired_ux
);
self.select_primary_leg_min_rtt(false)
}
}
}
#[cfg(feature = "conflux")]
fn maybe_select_init_primary(&mut self) {
let best = self
.legs
.iter()
.filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
.min_by_key(|(_leg, rtt)| *rtt)
.map(|(leg, _rtt)| leg.unique_id());
if let Some(best) = best {
self.primary_id = best;
self.selected_init_primary = true;
}
}
#[cfg(feature = "conflux")]
fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
let mut best = None;
for circ in self.legs.iter() {
let leg_id = circ.unique_id();
let join_point = self.join_point_hop(circ)?;
let ccontrol = join_point.ccontrol();
if check_can_send && !ccontrol.can_send() {
continue;
}
let rtt = ccontrol.rtt();
let init_rtt_usec = || {
circ.init_rtt()
.map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
};
let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
return Err(internal!(
"attempted to select primary leg before handshake completed?!"
));
};
match best.take() {
None => {
best = Some((leg_id, ewma_rtt));
}
Some(best_so_far) => {
if best_so_far.1 <= ewma_rtt {
best = Some(best_so_far);
} else {
best = Some((leg_id, ewma_rtt));
}
}
}
}
Ok(best.map(|(leg_id, _)| leg_id))
}
#[cfg(feature = "conflux")]
fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
internal!(
"Join point hop {} not found on circuit {}?!",
join_hop.display(),
circuit.unique_id(),
)
})?;
Ok(!join_circhop.ccontrol().can_send())
}
#[cfg(feature = "conflux")]
fn should_skip_join_point(&self) -> Result<bool, Bug> {
let Some(primary_join_point) = self.primary_join_point() else {
return Ok(false);
};
let join_hop = primary_join_point.1;
let primary_blocked_on_cc = {
let primary = self
.leg(self.primary_id)
.ok_or_else(|| internal!("primary leg disappeared?!"))?;
Self::is_join_point_blocked_on_cc(join_hop, primary)?
};
if !primary_blocked_on_cc {
return Ok(false);
}
let should_skip = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
trace!(
tunnel_id = %self.tunnel_id,
join_point = ?primary_join_point,
reason = "sending leg blocked on congestion control",
"Pausing join point stream reads"
);
true
} else {
let mut all_blocked_on_cc = true;
for leg in &self.legs {
all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
if !all_blocked_on_cc {
break;
}
}
if all_blocked_on_cc {
trace!(
tunnel_id = %self.tunnel_id,
join_point = ?primary_join_point,
reason = "all legs blocked on congestion control",
"Pausing join point stream reads"
);
true
} else {
false
}
};
Ok(should_skip)
}
#[allow(clippy::unnecessary_wraps)] #[instrument(level = "trace", skip_all)]
pub(super) async fn next_circ_event(
&mut self,
runtime: &tor_rtcompat::DynTimeProvider,
) -> Result<SmallVec<[CircuitEvent; CIRC_EVENT_COUNT]>, crate::Error> {
cfg_if::cfg_if! {
if #[cfg(feature = "conflux")] {
let mut should_poll_join_point = !self.should_skip_join_point()?;
} else {
let mut should_poll_join_point = true;
}
};
let join_point = self.primary_join_point().map(|join_point| join_point.1);
let mut poll_all =
PollAll::<MAX_CONFLUX_LEGS, SmallVec<[CircuitEvent; NUM_CIRC_FUTURES]>>::new();
for leg in &mut self.legs {
let unique_id = leg.unique_id();
let tunnel_id = self.tunnel_id;
let runtime = runtime.clone();
leg.remove_expired_halfstreams(runtime.now());
let conflux_hs_timeout = leg.conflux_hs_timeout();
let mut poll_all_circ = PollAll::<NUM_CIRC_FUTURES, CircuitEvent>::new();
let input = leg.input.next().map(move |res| match res {
Some(msg) => match msg.try_into() {
Ok(cell) => CircuitEvent::HandleCell {
leg: unique_id,
cell,
},
Err(e) => CircuitEvent::ProtoViolation { err: e },
},
None => CircuitEvent::RemoveLeg {
leg: unique_id,
reason: RemoveLegReason::ChannelClosed,
},
});
poll_all_circ.push(input);
let chan_ready_fut = futures::future::poll_fn(|cx| {
use futures::Sink as _;
Pin::new(&mut leg.chan_sender).poll_ready(cx)
});
let exclude_hop = if should_poll_join_point {
should_poll_join_point = false;
None
} else {
join_point
};
let mut ready_streams = leg.hops.ready_streams_iterator(exclude_hop);
let next_ready_stream = async move {
let _ = chan_ready_fut.await;
match ready_streams.next().await {
Some(x) => x,
None => {
info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
let () = std::future::pending().await;
unreachable!();
}
}
};
poll_all_circ.push(next_ready_stream.map(move |cmd| CircuitEvent::RunCmd {
leg: unique_id,
cmd,
}));
let mut next_padding_event_fut = leg.padding_event_stream.next();
poll_all.push(
async move {
let conflux_hs_timeout = if let Some(timeout) = conflux_hs_timeout {
Box::pin(runtime.sleep_until_wallclock(timeout))
as Pin<Box<dyn Future<Output = ()> + Send>>
} else {
Box::pin(std::future::pending())
};
select_biased! {
() = conflux_hs_timeout.fuse() => {
warn!(
tunnel_id = %tunnel_id,
circ_id = %unique_id,
"Conflux handshake timed out on circuit"
);
smallvec![CircuitEvent::RemoveLeg {
leg: unique_id,
reason: RemoveLegReason::ConfluxHandshakeTimeout,
}]
}
padding_event = next_padding_event_fut => {
smallvec![CircuitEvent::PaddingAction {
leg: unique_id,
padding_event:
padding_event.expect("PaddingEventStream, surprisingly, was terminated!"),
}]
}
ret = poll_all_circ.fuse() => ret,
}
}
);
}
Ok(poll_all.await.into_iter().flatten().collect())
}
pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
self.join_point
.as_ref()
.map(|join_point| (self.primary_id, join_point.hop))
}
pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
self.leg(leg)?.uses_stream_sendme(hop)
}
#[instrument(level = "trace", skip_all)]
pub(super) async fn send_relay_cell_on_leg(
&mut self,
msg: SendRelayCell,
leg: Option<UniqId>,
) -> crate::Result<()> {
let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
let leg = if let Some(join_point) = conflux_join_point {
let hop = msg.hop.expect("missing hop in client SendRelayCell?!");
if cmd_counts_towards_seqno(msg.cell.cmd()) {
if hop != join_point {
leg
} else {
let old_primary_leg = self.primary_id;
#[cfg(feature = "conflux")]
if let Some(switch_cell) = self.maybe_update_primary_leg()? {
trace!(
old = ?old_primary_leg,
new = ?self.primary_id,
"Switching primary conflux leg..."
);
self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
}
Some(self.primary_id)
}
} else {
leg
}
} else {
leg
};
let leg = leg.unwrap_or(self.primary_id);
let circ = self
.leg_mut(leg)
.ok_or_else(|| internal!("leg disappeared?!"))?;
circ.send_relay_cell(msg).await
}
#[cfg(feature = "conflux")]
pub(super) async fn link_circuits(
&mut self,
runtime: &tor_rtcompat::DynTimeProvider,
) -> crate::Result<()> {
let (_leg_id, join_point) = self
.primary_join_point()
.ok_or_else(|| internal!("no join point when trying to send LINK"))?;
for circ in self
.legs
.iter_mut()
.filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
{
let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
let link = ConfluxLink::new(v1_payload);
let cell = AnyRelayMsgOuter::new(None, link.into());
circ.begin_conflux_link(join_point, cell, runtime).await?;
}
Ok(())
}
#[cfg(feature = "conflux")]
pub(super) fn num_unlinked(&self) -> usize {
self.circuits()
.filter(|circ| {
let status = circ.conflux_status();
status.is_none() || status == Some(ConfluxStatus::Unlinked)
})
.count()
}
pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
seq_recv == last_seq_delivered + 1
}
fn remove_unchecked(&mut self, circ_id: UniqId) -> Result<Circuit, Bug> {
let idx = self
.legs
.iter()
.position(|circ| circ.unique_id() == circ_id)
.ok_or_else(|| internal!("leg {circ_id:?} not found in conflux set"))?;
Ok(self.legs.remove(idx))
}
#[cfg(feature = "circ-padding")]
pub(super) async fn run_padding_event(
&mut self,
circ_id: UniqId,
padding_event: PaddingEvent,
) -> crate::Result<()> {
use PaddingEvent as E;
let Some(circ) = self.leg_mut(circ_id) else {
return Ok(());
};
match padding_event {
E::SendPadding(send_padding) => {
circ.send_padding(send_padding).await?;
}
E::StartBlocking(start_blocking) => {
circ.start_blocking_for_padding(start_blocking);
}
E::StopBlocking => {
circ.stop_blocking_for_padding();
}
}
Ok(())
}
}
#[derive(Clone, Debug, derive_more::Display, thiserror::Error)]
pub(super) struct NotSingleLegError(#[source] Bug);
impl From<NotSingleLegError> for Bug {
fn from(e: NotSingleLegError) -> Self {
e.0
}
}
impl From<NotSingleLegError> for crate::Error {
fn from(e: NotSingleLegError) -> Self {
Self::from(e.0)
}
}
impl From<NotSingleLegError> for ReactorError {
fn from(e: NotSingleLegError) -> Self {
Self::from(e.0)
}
}
impl<I: Iterator> From<ExactlyOneError<I>> for NotSingleLegError {
fn from(e: ExactlyOneError<I>) -> Self {
Self(bad_api_usage!("not a single leg conflux set ({e})"))
}
}
#[cfg(test)]
mod test {
}