use arti_client::TorClient;
use arti_rpcserver::RpcAuthentication;
use derive_deftly::Deftly;
use futures::stream::StreamExt as _;
use std::sync::Arc;
use tor_async_utils::{DropNotifyEofSignallable, DropNotifyWatchSender};
use tor_rpcbase::{self as rpc};
use tor_rtcompat::Runtime;
use crate::proxy::port_info;
use super::proxyinfo::{self, ProxyInfo};
#[derive(Deftly)]
#[derive_deftly(rpc::Object)]
#[deftly(rpc(
delegate_with = "|this: &Self| Some(this.session.clone())",
delegate_type = "arti_rpcserver::RpcSession"
))]
#[deftly(rpc(expose_outside_of_session))]
pub(super) struct ArtiRpcSession {
pub(super) arti_state: Arc<RpcVisibleArtiState>,
session: Arc<arti_rpcserver::RpcSession>,
}
pub(crate) struct RpcVisibleArtiState {
proxy_info: postage::watch::Receiver<ProxyInfoState>,
}
#[derive(Debug)]
pub(crate) struct RpcStateSender {
proxy_info_sender: DropNotifyWatchSender<ProxyInfoState>,
}
impl ArtiRpcSession {
pub(super) fn new<R: Runtime>(
auth: &RpcAuthentication,
client_root: &TorClient<R>,
arti_state: &Arc<RpcVisibleArtiState>,
) -> Arc<Self> {
let _ = auth; let client = client_root.isolated_client();
let session = arti_rpcserver::RpcSession::new_with_client(Arc::new(client));
let arti_state = Arc::clone(arti_state);
Arc::new(ArtiRpcSession {
session,
arti_state,
})
}
}
#[derive(Debug, Clone)]
enum ProxyInfoState {
Unset,
Set(Arc<ProxyInfo>),
Eof,
}
impl DropNotifyEofSignallable for ProxyInfoState {
fn eof() -> Self {
Self::Eof
}
}
impl RpcVisibleArtiState {
pub(crate) fn new() -> (Arc<Self>, RpcStateSender) {
let (proxy_info_sender, proxy_info) = postage::watch::channel_with(ProxyInfoState::Unset);
let proxy_info_sender = DropNotifyWatchSender::new(proxy_info_sender);
(
Arc::new(Self { proxy_info }),
RpcStateSender { proxy_info_sender },
)
}
pub(super) async fn get_proxy_info(&self) -> Result<Arc<ProxyInfo>, ()> {
let mut proxy_info = self.proxy_info.clone();
while let Some(v) = proxy_info.next().await {
match v {
ProxyInfoState::Unset => {
}
ProxyInfoState::Set(proxyinfo) => return Ok(Arc::clone(&proxyinfo)),
ProxyInfoState::Eof => return Err(()),
}
}
Err(())
}
}
impl RpcStateSender {
pub(crate) fn set_stream_listeners(&mut self, ports: &[port_info::Port]) {
let info = ProxyInfo {
proxies: ports
.iter()
.filter_map(|port| {
Some(proxyinfo::Proxy {
listener: proxyinfo::ProxyListener::try_from_portinfo(port)?,
})
})
.collect(),
};
*self.proxy_info_sender.borrow_mut() = ProxyInfoState::Set(Arc::new(info));
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use tor_rtcompat::SpawnExt as _;
use tor_rtmock::MockRuntime;
use super::*;
#[test]
fn set_proxy_info() {
MockRuntime::test_with_various(|rt| async move {
let (state, mut sender) = RpcVisibleArtiState::new();
let _task = rt.clone().spawn_with_handle(async move {
sender.set_stream_listeners(&[port_info::Port {
protocol: port_info::SupportedProtocol::Socks,
address: "8.8.8.8:40".parse().unwrap(),
}]);
sender });
let value = state.get_proxy_info().await;
let value_again = state.get_proxy_info().await;
assert_eq!(value.unwrap(), value_again.unwrap());
});
}
}