use crate::circuit::UniqId;
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
use crate::circuit::reactor::circhop::CircHopList;
use crate::circuit::reactor::stream::{ReadyStreamMsg, StreamHandler, StreamMsg, StreamReactor};
use crate::congestion::CongestionControl;
use crate::memquota::CircuitAccount;
use crate::util::err::ReactorError;
use crate::{Error, HopNum, Result};
#[cfg(any(feature = "hs-service", feature = "relay"))]
use crate::stream::incoming::IncomingStreamRequestHandler;
use tor_error::internal;
use tor_rtcompat::Runtime;
use futures::SinkExt;
use futures::channel::mpsc;
use std::result::Result as StdResult;
use std::sync::{Arc, Mutex, RwLock};
pub(crate) struct HopMgr<R: Runtime> {
runtime: R,
ctx: StreamReactorContext,
bwd_tx: mpsc::Sender<ReadyStreamMsg>,
hops: Arc<RwLock<CircHopList>>,
memquota: CircuitAccount,
}
struct StreamReactorContext {
unique_id: UniqId,
#[cfg(any(feature = "hs-service", feature = "relay"))]
incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
handler: Arc<dyn StreamHandler>,
}
impl<R: Runtime> HopMgr<R> {
pub(crate) fn new<S: StreamHandler>(
runtime: R,
unique_id: UniqId,
handler: S,
bwd_tx: mpsc::Sender<ReadyStreamMsg>,
memquota: CircuitAccount,
) -> Self {
let hops = Arc::new(RwLock::new(Default::default()));
let ctx = StreamReactorContext {
unique_id,
#[cfg(any(feature = "hs-service", feature = "relay"))]
incoming: Arc::new(Mutex::new(None)),
handler: Arc::new(handler),
};
Self {
runtime,
hops,
ctx,
bwd_tx,
memquota,
}
}
pub(crate) fn hops(&self) -> &Arc<RwLock<CircHopList>> {
&self.hops
}
#[cfg(any(feature = "hs-service", feature = "relay"))]
pub(crate) fn set_incoming_handler(&self, handler: IncomingStreamRequestHandler) -> Result<()> {
let mut lock = self.ctx.incoming.lock().expect("poisoned lock");
if lock.is_none() {
*lock = Some(handler);
Ok(())
} else {
Err(Error::from(internal!(
"Tried to install a BEGIN cell handler before the old one was gone."
)))
}
}
pub(crate) fn add_hop(&mut self, settings: HopSettings) -> Result<()> {
let mut hops = self.hops.write().expect("poisoned lock");
hops.add_hop(settings)
}
pub(crate) async fn send(
&mut self,
hopnum: Option<HopNum>,
msg: StreamMsg,
) -> StdResult<(), ReactorError> {
let mut tx = self.get_or_spawn_stream_reactor(hopnum)?;
tx.send(msg).await.map_err(|_| {
ReactorError::Shutdown
})
}
fn get_or_spawn_stream_reactor(
&self,
hopnum: Option<HopNum>,
) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
let mut hops = self.hops.write().expect("poisoned lock");
let hop = hops
.get_mut(hopnum)
.ok_or_else(|| internal!("tried to send cell to nonexistent hop?!"))?;
let tx = match &hop.tx {
Some(tx) => tx.clone(),
None => {
let tx =
self.spawn_stream_reactor(hopnum, &hop.settings, Arc::clone(&hop.ccontrol))?;
hop.tx = Some(tx.clone());
tx
}
};
Ok(tx)
}
fn spawn_stream_reactor(
&self,
hopnum: Option<HopNum>,
settings: &HopSettings,
ccontrol: Arc<Mutex<CongestionControl>>,
) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
use tor_rtcompat::SpawnExt as _;
#[allow(clippy::disallowed_methods)]
let (fwd_stream_tx, fwd_stream_rx) = mpsc::channel(0);
let flow_ctrl_params = Arc::new(settings.flow_ctrl_params.clone());
let relay_format = settings.relay_crypt_protocol().relay_cell_format();
let outbound = CircHopOutbound::new(ccontrol, relay_format, flow_ctrl_params, settings);
let stream_reactor = StreamReactor::new(
self.runtime.clone(),
hopnum,
outbound,
self.ctx.unique_id,
fwd_stream_rx,
self.bwd_tx.clone(),
Arc::clone(&self.ctx.handler),
#[cfg(any(feature = "hs-service", feature = "relay"))]
Arc::clone(&self.ctx.incoming),
self.memquota.clone(),
);
self.runtime
.spawn(async {
let _ = stream_reactor.run().await;
})
.map_err(|_| ReactorError::Shutdown)?;
Ok(fwd_stream_tx)
}
}