//! Abstract implementation of a channel manager
use crate::mgr::state::{OpenEntry, PendingEntry};
use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
use crate::factory::BootstrapReporter;
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::{FutureExt, Shared};
use rand::Rng;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Duration;
use tor_error::internal;
use tor_linkspec::{HasRelayIds, RelayIds};
use tor_netdir::params::NetParameters;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
mod state;
/// Trait to describe as much of a
/// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr`
/// needs to use.
pub(crate) trait AbstractChannel: Clone + HasRelayIds {
/// Return true if this channel is usable.
///
/// A channel might be unusable because it is closed, because it has
/// hit a bug, or for some other reason. We don't return unusable
/// channels back to the user.
fn is_usable(&self) -> bool;
/// Return the amount of time a channel has not been in use.
/// Return None if the channel is currently in use.
fn duration_unused(&self) -> Option<Duration>;
/// Reparameterize this channel according to the provided `ChannelPaddingInstructionsUpdates`
///
/// The changed parameters may not be implemented "immediately",
/// but this will be done "reasonably soon".
fn reparameterize(
&self,
updates: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()>;
/// Specify that this channel should do activities related to channel padding
///
/// See [`Channel::engage_padding_activities`]
///
/// [`Channel::engage_padding_activities`]: tor_proto::channel::Channel::engage_padding_activities
fn engage_padding_activities(&self);
}
/// Trait to describe how channels-like objects are created.
///
/// This differs from [`ChannelFactory`](crate::factory::ChannelFactory) in that
/// it's a purely crate-internal type that we use to decouple the
/// AbstractChanMgr code from actual "what is a channel" concerns.
#[async_trait]
pub(crate) trait AbstractChannelFactory {
/// The type of channel that this factory can build.
type Channel: AbstractChannel;
/// Type that explains how to build a channel.
type BuildSpec: HasRelayIds;
/// Construct a new channel to the destination described at `target`.
///
/// This function must take care of all timeouts, error detection,
/// and so on.
///
/// It should not retry; that is handled at a higher level.
async fn build_channel(
&self,
target: &Self::BuildSpec,
reporter: BootstrapReporter,
) -> Result<Self::Channel>;
}
/// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr).
///
/// This type does the work of keeping track of open channels and pending
/// channel requests, launching requests as needed, waiting for pending
/// requests, and so forth.
///
/// The actual job of launching connections is deferred to an
/// `AbstractChannelFactory` type.
pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
/// All internal state held by this channel manager.
///
/// The most important part is the map from relay identity to channel, or
/// to pending channel status.
pub(crate) channels: state::MgrState<CF>,
/// A bootstrap reporter to give out when building channels.
pub(crate) reporter: BootstrapReporter,
}
/// Type alias for a future that we wait on to see when a pending
/// channel is done or failed.
type Pending = Shared<oneshot::Receiver<Result<()>>>;
/// Type alias for the sender we notify when we complete a channel (or fail to
/// complete it).
type Sending = oneshot::Sender<Result<()>>;
impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
/// Make a new empty channel manager.
pub(crate) fn new(
connector: CF,
config: &ChannelConfig,
dormancy: Dormancy,
netparams: &NetParameters,
reporter: BootstrapReporter,
) -> Self {
AbstractChanMgr {
channels: state::MgrState::new(connector, config.clone(), dormancy, netparams),
reporter,
}
}
/// Run a function to modify the channel builder in this object.
#[allow(dead_code)]
pub(crate) fn with_mut_builder<F>(&self, func: F)
where
F: FnOnce(&mut CF),
{
self.channels.with_mut_builder(func);
}
/// Remove every unusable entry from this channel manager.
#[cfg(test)]
pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
self.channels.remove_unusable()
}
/// Helper: return the objects used to inform pending tasks
/// about a newly open or failed channel.
fn setup_launch<C: Clone>(&self, ids: RelayIds) -> (state::ChannelState<C>, Sending) {
let (snd, rcv) = oneshot::channel();
let pending = rcv.shared();
(
state::ChannelState::Building(state::PendingEntry { ids, pending }),
snd,
)
}
/// Get a channel corresponding to the identities of `target`.
///
/// If a usable channel exists with that identity, return it.
///
/// If no such channel exists already, and none is in progress,
/// launch a new request using `target`.
///
/// If no such channel exists already, but we have one that's in
/// progress, wait for it to succeed or fail.
pub(crate) async fn get_or_launch(
&self,
target: CF::BuildSpec,
usage: ChannelUsage,
) -> Result<(CF::Channel, ChanProvenance)> {
use ChannelUsage as CU;
let chan = self.get_or_launch_internal(target).await?;
match usage {
CU::Dir | CU::UselessCircuit => {}
CU::UserTraffic => chan.0.engage_padding_activities(),
}
Ok(chan)
}
/// Get a channel whose identity is `ident` - internal implementation
async fn get_or_launch_internal(
&self,
target: CF::BuildSpec,
) -> Result<(CF::Channel, ChanProvenance)> {
/// How many times do we try?
const N_ATTEMPTS: usize = 2;
let mut attempts_so_far = 0;
let mut final_attempt = false;
let mut provenance = ChanProvenance::Preexisting;
// TODO(nickm): It would be neat to use tor_retry instead.
let mut last_err = None;
while attempts_so_far < N_ATTEMPTS || final_attempt {
attempts_so_far += 1;
// For each attempt, we _first_ look at the state of the channel map
// to decide on an `Action`, and _then_ we execute that action.
// First, see what state we're in, and what we should do about it.
let action = self.choose_action(&target, final_attempt)?;
// We are done deciding on our Action! It's time act based on the
// Action that we chose.
match action {
// If this happens, we were trying to make one final check of our state, but
// we would have had to make additional attempts.
None => {
if !final_attempt {
return Err(Error::Internal(internal!(
"No action returned while not on final attempt"
)));
}
break;
}
// Easy case: we have an error or a channel to return.
Some(Action::Return(v)) => {
return v.map(|chan| (chan, provenance));
}
// There's an in-progress channel. Wait for it.
Some(Action::Wait(pend)) => {
match pend.await {
Ok(Ok(())) => {
// We were waiting for a channel, and it succeeded, or it
// got cancelled. But it might have gotten more
// identities while negotiating than it had when it was
// launched, or it might have failed to get all the
// identities we want. Check for this.
final_attempt = true;
provenance = ChanProvenance::NewlyCreated;
last_err.get_or_insert(Error::RequestCancelled);
}
Ok(Err(e)) => {
last_err = Some(e);
}
Err(_) => {
last_err =
Some(Error::Internal(internal!("channel build task disappeared")));
}
}
}
// We need to launch a channel.
Some(Action::Launch(send)) => {
let connector = self.channels.builder();
let outcome = connector
.build_channel(&target, self.reporter.clone())
.await;
let status = self.handle_build_outcome(&target, outcome);
// It's okay if all the receivers went away:
// that means that nobody was waiting for this channel.
let _ignore_err = send.send(status.clone().map(|_| ()));
match status {
Ok(Some(chan)) => {
return Ok((chan, ChanProvenance::NewlyCreated));
}
Ok(None) => {
final_attempt = true;
provenance = ChanProvenance::NewlyCreated;
last_err.get_or_insert(Error::RequestCancelled);
}
Err(e) => last_err = Some(e),
}
}
}
// End of this attempt. We will try again...
}
Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
}
/// Helper: based on our internal state, decide which action to take when
/// asked for a channel, and update our internal state accordingly.
///
/// If `final_attempt` is true, then we will not pick any action that does
/// not result in an immediate result. If we would pick such an action, we
/// instead return `Ok(None)`. (We could instead have the caller detect
/// such actions, but it's less efficient to construct them, insert them,
/// and immediately revert them.)
fn choose_action(
&self,
target: &CF::BuildSpec,
final_attempt: bool,
) -> Result<Option<Action<CF::Channel>>> {
use state::ChannelState::*;
self.channels.with_channels(|channel_map| {
match channel_map.by_all_ids(target) {
Some(Open(OpenEntry { channel, .. })) => {
if channel.is_usable() {
// This entry is a perfect match for the target keys:
// we'll return the open entry.
return Ok(Some(Action::Return(Ok(channel.clone()))));
} else if final_attempt {
// We don't launch an attempt in this case.
return Ok(None);
} else {
// This entry was a perfect match for the target, but it
// is no longer usable! We launch a new connection to
// this target, and wait on that.
let (new_state, send) = self.setup_launch(RelayIds::from_relay_ids(target));
channel_map.try_insert(new_state)?;
return Ok(Some(Action::Launch(send)));
}
}
Some(Building(PendingEntry { pending, .. })) => {
// This entry is a perfect match for the target keys: we'll
// return the pending entry. (We don't know for sure if it
// will match once it completes, since we might discover
// additional keys beyond those listed for this pending
// entry.)
if final_attempt {
// We don't launch an attempt in this case.
return Ok(None);
}
return Ok(Some(Action::Wait(pending.clone())));
}
_ => {}
}
// Okay, we don't have an exact match. But we might have one or more _partial_ matches?
let overlapping = channel_map.all_overlapping(target);
if overlapping
.iter()
.any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
{
// At least one *open, usable* channel has been negotiated that
// overlaps only partially with our target: it has proven itself
// to have _one_ of our target identities, but not all.
//
// Because this channel exists, we know that our target cannot
// succeed, since relays are not allowed to share _any_
// identities.
return Ok(Some(Action::Return(Err(Error::IdentityConflict))));
} else if let Some(first_building) = overlapping
.iter()
.find(|entry| matches!(entry, Building(_)))
{
// There is at least one *in-progress* channel that has at least
// one identity in common with our target, but it does not have
// _all_ the identities we want.
//
// If it succeeds, we might find that we can use it; or we might
// find out that it is not suitable. So we'll wait for it, and
// see what happens.
//
// TODO: This approach will _not_ be sufficient once we are
// implementing relays, and some of our channels are created in
// response to client requests.
match first_building {
Open(_) => unreachable!(),
Building(PendingEntry { pending, .. }) => {
if final_attempt {
// We don't wait in this case.
return Ok(None);
}
return Ok(Some(Action::Wait(pending.clone())));
}
}
}
if final_attempt {
// We don't launch an attempt in this case.
return Ok(None);
}
// Great, nothing interfered at all.
let (new_state, send) = self.setup_launch(RelayIds::from_relay_ids(target));
channel_map.try_insert(new_state)?;
Ok(Some(Action::Launch(send)))
})?
}
/// We just tried to build a channel: Handle the outcome and decide what to
/// do.
///
/// Return `Ok(None)` if we have a transient error that we expect will be
/// cleaned up by one final call to `choose_action`.
fn handle_build_outcome(
&self,
target: &CF::BuildSpec,
outcome: Result<CF::Channel>,
) -> Result<Option<CF::Channel>> {
use state::ChannelState::*;
match outcome {
Ok(chan) => {
// The channel got built: remember it, tell the
// others, and return it.
self.channels
.with_channels_and_params(|channel_map, channels_params| {
match channel_map.remove_exact(target) {
Some(Building(_)) => {
// We successfully removed our pending
// action. great! Fall through and add
// the channel we just built.
}
None => {
// Something removed our entry from the list.
// Time to retry.
return Ok(None);
}
Some(ent @ Open(_)) => {
// Oh no. Something else built an entry
// here, and replaced us. Put that
// something back, and retry.
channel_map.insert(ent);
return Ok(None);
}
}
// This isn't great. We context switch to the newly-created
// channel just to tell it how and whether to do padding. Ideally
// we would pass the params at some suitable point during
// building. However, that would involve the channel taking a
// copy of the params, and that must happen in the same channel
// manager lock acquisition span as the one where we insert the
// channel into the table so it will receive updates. I.e.,
// here.
let update = channels_params.initial_update();
if let Some(update) = update {
chan.reparameterize(update.into())
.map_err(|_| internal!("failure on new channel"))?;
}
let new_entry = Open(OpenEntry {
channel: chan.clone(),
max_unused_duration: Duration::from_secs(
rand::thread_rng().gen_range(180..270),
),
});
channel_map.insert(new_entry);
Ok(Some(chan))
})?
}
Err(e) => {
// The channel failed. Make it non-pending, tell the
// others, and set the error.
self.channels.with_channels(|channel_map| {
match channel_map.remove_exact(target) {
Some(Building(_)) | None => {
// We successfully removed our pending
// action, or somebody else did.
}
Some(ent @ Open(_)) => {
// Oh no. Something else built an entry
// here, and replaced us. Put that
// something back.
channel_map.insert(ent);
}
}
})?;
Err(e)
}
}
}
/// Update the netdir
pub(crate) fn update_netparams(
&self,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), tor_error::Bug> {
self.channels.reconfigure_general(None, None, netparams)
}
/// Notifies the chanmgr to be dormant like dormancy
pub(crate) fn set_dormancy(
&self,
dormancy: Dormancy,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), tor_error::Bug> {
self.channels
.reconfigure_general(None, Some(dormancy), netparams)
}
/// Reconfigure all channels
pub(crate) fn reconfigure(
&self,
config: &ChannelConfig,
netparams: Arc<dyn AsRef<NetParameters>>,
) -> StdResult<(), tor_error::Bug> {
self.channels
.reconfigure_general(Some(config), None, netparams)
}
/// Expire any channels that have been unused longer than
/// their maximum unused duration assigned during creation.
///
/// Return a duration from now until next channel expires.
///
/// If all channels are in use or there are no open channels,
/// return 180 seconds which is the minimum value of
/// max_unused_duration.
pub(crate) fn expire_channels(&self) -> Duration {
self.channels.expire_channels()
}
/// Test only: return the current open usable channel with a given
/// `ident`, if any.
#[cfg(test)]
pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Option<CF::Channel>
where
T: Into<tor_linkspec::RelayIdRef<'a>>,
{
use state::ChannelState::*;
self.channels
.with_channels(|channel_map| match channel_map.by_id(ident) {
Some(Open(ref ent)) if ent.channel.is_usable() => Some(ent.channel.clone()),
_ => None,
})
.expect("Poisoned lock")
}
}
/// Possible actions that we'll decide to take when asked for a channel.
#[allow(clippy::large_enum_variant)]
enum Action<C> {
/// We found no channel. We're going to launch a new one,
/// then tell everybody about it.
Launch(Sending),
/// We found an in-progress attempt at making a channel.
/// We're going to wait for it to finish.
Wait(Pending),
/// We found a usable channel. We're going to return it.
Return(Result<C>),
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::Error;
use futures::join;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tor_error::bad_api_usage;
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use crate::ChannelUsage as CU;
use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime};
#[derive(Clone)]
struct FakeChannelFactory<RT> {
runtime: RT,
}
#[derive(Clone, Debug)]
struct FakeChannel {
ed_ident: Ed25519Identity,
mood: char,
closing: Arc<AtomicBool>,
detect_reuse: Arc<char>,
// last_params: Option<ChannelPaddingInstructionsUpdates>,
}
impl PartialEq for FakeChannel {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
}
}
impl AbstractChannel for FakeChannel {
fn is_usable(&self) -> bool {
!self.closing.load(Ordering::SeqCst)
}
fn duration_unused(&self) -> Option<Duration> {
None
}
fn reparameterize(
&self,
_updates: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()> {
// *self.last_params.lock().unwrap() = Some((*updates).clone());
Ok(())
}
fn engage_padding_activities(&self) {}
}
impl HasRelayIds for FakeChannel {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match key_type {
tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
_ => None,
}
}
}
impl FakeChannel {
fn start_closing(&self) {
self.closing.store(true, Ordering::SeqCst);
}
}
impl<RT: Runtime> FakeChannelFactory<RT> {
fn new(runtime: RT) -> Self {
FakeChannelFactory { runtime }
}
}
fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
let cf = FakeChannelFactory::new(runtime);
AbstractChanMgr::new(
cf,
&ChannelConfig::default(),
Default::default(),
&Default::default(),
BootstrapReporter::fake(),
)
}
#[derive(Clone, Debug)]
struct FakeBuildSpec(u32, char, Ed25519Identity);
impl HasRelayIds for FakeBuildSpec {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match key_type {
tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
_ => None,
}
}
}
/// Helper to make a fake Ed identity from a u32.
fn u32_to_ed(n: u32) -> Ed25519Identity {
let mut bytes = [0; 32];
bytes[0..4].copy_from_slice(&n.to_be_bytes());
bytes.into()
}
#[async_trait]
impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
type Channel = FakeChannel;
type BuildSpec = FakeBuildSpec;
async fn build_channel(
&self,
target: &Self::BuildSpec,
_reporter: BootstrapReporter,
) -> Result<FakeChannel> {
yield_now().await;
let FakeBuildSpec(ident, mood, id) = *target;
let ed_ident = u32_to_ed(ident);
assert_eq!(ed_ident, id);
match mood {
// "X" means never connect.
'❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
// "zzz" means wait for 15 seconds then succeed.
'💤' => {
self.runtime.sleep(Duration::new(15, 0)).await;
}
_ => {}
}
Ok(FakeChannel {
ed_ident,
mood,
closing: Arc::new(AtomicBool::new(false)),
detect_reuse: Default::default(),
// last_params: None,
})
}
}
#[test]
fn connect_one_ok() {
test_with_one_runtime!(|runtime| async {
let mgr = new_test_abstract_chanmgr(runtime);
let target = FakeBuildSpec(413, '!', u32_to_ed(413));
let chan1 = mgr
.get_or_launch(target.clone(), CU::UserTraffic)
.await
.unwrap()
.0;
let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
assert_eq!(chan1, chan2);
let chan3 = mgr.get_nowait(&u32_to_ed(413)).unwrap();
assert_eq!(chan1, chan3);
});
}
#[test]
fn connect_one_fail() {
test_with_one_runtime!(|runtime| async {
let mgr = new_test_abstract_chanmgr(runtime);
// This is set up to always fail.
let target = FakeBuildSpec(999, '❌', u32_to_ed(999));
let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
assert!(matches!(res1, Err(Error::UnusableTarget(_))));
let chan3 = mgr.get_nowait(&u32_to_ed(999));
assert!(chan3.is_none());
});
}
#[test]
fn test_concurrent() {
test_with_one_runtime!(|runtime| async {
let mgr = new_test_abstract_chanmgr(runtime);
// TODO(nickm): figure out how to make these actually run
// concurrently. Right now it seems that they don't actually
// interact.
let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!(
mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic),
);
let ch3a = ch3a.unwrap();
let ch3b = ch3b.unwrap();
let ch44a = ch44a.unwrap();
let ch44b = ch44b.unwrap();
let err_a = ch86a.unwrap_err();
let err_b = ch86b.unwrap_err();
assert_eq!(ch3a, ch3b);
assert_eq!(ch44a, ch44b);
assert_ne!(ch44a, ch3a);
assert!(matches!(err_a, Error::UnusableTarget(_)));
assert!(matches!(err_b, Error::UnusableTarget(_)));
});
}
#[test]
fn unusable_entries() {
test_with_one_runtime!(|runtime| async {
let mgr = new_test_abstract_chanmgr(runtime);
let (ch3, ch4, ch5) = join!(
mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic),
);
let ch3 = ch3.unwrap().0;
let _ch4 = ch4.unwrap();
let ch5 = ch5.unwrap().0;
ch3.start_closing();
ch5.start_closing();
let ch3_new = mgr
.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic)
.await
.unwrap()
.0;
assert_ne!(ch3, ch3_new);
assert_eq!(ch3_new.mood, 'b');
mgr.remove_unusable_entries().unwrap();
assert!(mgr.get_nowait(&u32_to_ed(3)).is_some());
assert!(mgr.get_nowait(&u32_to_ed(4)).is_some());
assert!(mgr.get_nowait(&u32_to_ed(5)).is_none());
});
}
}