use arti_client::rpc::{
ClientConnectionResult, ConnectWithPrefs, ResolvePtrWithPrefs, ResolveWithPrefs,
};
use derive_deftly::Deftly;
use std::{
net::IpAddr,
sync::{Arc, Mutex},
};
use tor_error::into_internal;
use tor_proto::client::stream::ClientDataStreamCtrl;
use tor_rpcbase::{self as rpc, templates::*};
use crate::RpcSession;
#[derive(Deftly)]
#[derive_deftly(Object)]
#[deftly(rpc(expose_outside_of_session))]
pub(crate) struct OneshotClient {
inner: Mutex<Inner>,
}
enum Inner {
Unused(Arc<dyn rpc::Object>),
Launching,
Stream(Arc<ClientDataStreamCtrl>),
UsedToResolve,
StreamFailed,
}
#[derive(Debug, Clone, thiserror::Error)]
enum OneshotClientError {
#[error("Data stream object already used")]
AlreadyUsed,
}
impl tor_error::HasKind for OneshotClientError {
fn kind(&self) -> tor_error::ErrorKind {
use OneshotClientError as E;
use tor_error::ErrorKind as EK;
match self {
E::AlreadyUsed => EK::BadApiUsage, }
}
}
impl OneshotClient {
pub(crate) fn new(connector: Arc<dyn rpc::Object>) -> Self {
Self {
inner: Mutex::new(Inner::Unused(connector)),
}
}
fn take_connector(&self, new_state: Inner) -> Result<Arc<dyn rpc::Object>, OneshotClientError> {
let mut inner = self.inner.lock().expect("poisoned lock");
let val = std::mem::replace(&mut *inner, new_state);
if let Inner::Unused(conn) = val {
Ok(conn)
} else {
*inner = val;
Err(OneshotClientError::AlreadyUsed)
}
}
#[allow(dead_code)]
fn get_ctrl(&self) -> Option<Arc<ClientDataStreamCtrl>> {
let inner = self.inner.lock().expect("poisoned lock");
if let Inner::Stream(s) = &*inner {
Some(s.clone())
} else {
None
}
}
}
async fn oneshot_client_connect_with_prefs(
rpc_data_stream: Arc<OneshotClient>,
mut method: Box<ConnectWithPrefs>,
ctx: Arc<dyn rpc::Context>,
) -> ClientConnectionResult<arti_client::DataStream> {
let connector = rpc_data_stream
.take_connector(Inner::Launching)
.map_err(|e| Box::new(e) as _)?;
let was_optimistic = method.prefs.is_optimistic();
method.prefs.optimistic();
let stream: Result<arti_client::DataStream, _> =
*rpc::invoke_special_method(ctx, connector, method)
.await
.map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
let new_obj = match &stream {
Ok(s) => Inner::Stream(
s.client_stream_ctrl()
.expect("Created a client stream with no ClientDataStreamCtrl!?")
.clone(),
),
Err(_) => Inner::StreamFailed, };
{
let mut inner = rpc_data_stream.inner.lock().expect("poisoned lock");
*inner = new_obj;
}
let mut stream = stream?;
if !was_optimistic {
stream
.wait_for_connection()
.await
.map_err(|e| Box::new(e) as _)?;
}
Ok(stream)
}
async fn oneshot_client_resolve_with_prefs(
rpc_data_stream: Arc<OneshotClient>,
method: Box<ResolveWithPrefs>,
ctx: Arc<dyn rpc::Context>,
) -> ClientConnectionResult<Vec<IpAddr>> {
let connector = rpc_data_stream
.take_connector(Inner::UsedToResolve)
.map_err(|e| Box::new(e) as _)?;
let result = rpc::invoke_special_method(ctx, connector, method)
.await
.map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
*result
}
async fn oneshot_client_resolve_ptr_with_prefs(
rpc_data_stream: Arc<OneshotClient>,
method: Box<ResolvePtrWithPrefs>,
ctx: Arc<dyn rpc::Context>,
) -> ClientConnectionResult<Vec<String>> {
let connector = rpc_data_stream
.take_connector(Inner::UsedToResolve)
.map_err(|e| Box::new(e) as _)?;
let result = rpc::invoke_special_method(ctx, connector, method)
.await
.map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
*result
}
#[derive(Debug, serde::Deserialize, serde::Serialize, Deftly)]
#[derive_deftly(DynMethod)]
#[deftly(rpc(method_name = "arti:new_oneshot_client"))]
pub(crate) struct NewOneshotClient {}
impl rpc::RpcMethod for NewOneshotClient {
type Output = rpc::SingleIdResponse;
type Update = rpc::NoUpdates; }
fn new_oneshot_client_impl(
connector: Arc<dyn rpc::Object>,
ctx: &dyn rpc::Context,
) -> rpc::ObjectId {
let rpc_stream = Arc::new(OneshotClient::new(connector));
ctx.register_owned(rpc_stream as _)
}
pub(crate) async fn new_oneshot_client_on_client<R: tor_rtcompat::Runtime>(
client: Arc<arti_client::TorClient<R>>,
_method: Box<NewOneshotClient>,
ctx: Arc<dyn rpc::Context>,
) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
Ok(new_oneshot_client_impl(client, ctx.as_ref()).into())
}
async fn new_oneshot_client_on_session(
session: Arc<RpcSession>,
_method: Box<NewOneshotClient>,
ctx: Arc<dyn rpc::Context>,
) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
Ok(new_oneshot_client_impl(session, ctx.as_ref()).into())
}
rpc::static_rpc_invoke_fn! {
new_oneshot_client_on_session;
@special oneshot_client_connect_with_prefs;
@special oneshot_client_resolve_with_prefs;
@special oneshot_client_resolve_ptr_with_prefs;
}