arti_rpc_client_core/conn/
stream.rs1use std::{
4 io::{Error as IoError, Read as _, Write as _},
5 net::{SocketAddr, TcpStream},
6 sync::Arc,
7};
8
9use serde::Deserialize;
10
11use super::{ErrorResponse, NoParams, RpcConn};
12use crate::{
13 ObjectId,
14 msgs::{request::Request, response::RpcErrorCode},
15};
16
17use tor_error::ErrorReport as _;
18
19#[derive(Clone, Debug, thiserror::Error)]
21#[non_exhaustive]
22pub enum StreamError {
23 #[error("An error occurred while invoking RPC methods")]
25 RpcMethods(#[from] super::ProtoError),
26
27 #[error("Request for proxy info rejected: {0}")]
29 ProxyInfoRejected(ErrorResponse),
30
31 #[error("Request for new stream ID rejected: {0}")]
33 NewStreamRejected(ErrorResponse),
34
35 #[error("Request for new stream ID rejected: {0}")]
37 StreamReleaseRejected(ErrorResponse),
38
39 #[error("RPC connection not authenticated")]
44 NotAuthenticated,
45
46 #[error("Unable to access Session object")]
51 NoSession,
52
53 #[error("Internal error: {0}")]
56 Internal(String),
57
58 #[error("No SOCKS proxy available")]
60 NoProxy,
61
62 #[error("IO error")]
65 Io(#[source] Arc<IoError>),
66
67 #[error("Invalid SOCKS request")]
72 SocksRequest(#[source] tor_socksproto::Error),
73
74 #[error("SOCKS protocol violation")]
77 SocksProtocol(#[source] tor_socksproto::Error),
78
79 #[error("SOCKS error code {0}")]
81 SocksError(tor_socksproto::SocksStatus),
82}
83
84impl From<IoError> for StreamError {
85 fn from(e: IoError) -> Self {
86 Self::Io(Arc::new(e))
87 }
88}
89
90#[derive(Deserialize, Debug)]
92struct SingleIdReply {
93 id: ObjectId,
95}
96
97#[derive(Deserialize, Clone, Debug)]
100pub(super) struct Proxy {
101 pub(super) listener: ProxyListener,
103}
104
105#[derive(Deserialize, Clone, Debug)]
107pub(super) enum ProxyListener {
109 #[serde(rename = "socks5")]
111 Socks5 {
112 tcp_address: Option<SocketAddr>,
114 },
115 #[serde(untagged)]
117 Unrecognized {},
118}
119
120impl Proxy {
121 fn socks_addr(&self) -> Option<SocketAddr> {
123 match self.listener {
124 ProxyListener::Socks5 { tcp_address } => tcp_address,
125 ProxyListener::Unrecognized {} => None,
126 }
127 }
128}
129
130impl ProxyInfoReply {
131 fn find_socks_addr(&self) -> Option<SocketAddr> {
133 self.proxies.iter().find_map(Proxy::socks_addr)
135 }
136}
137
138#[derive(Deserialize, Clone, Debug)]
141pub(super) struct ProxyInfoReply {
142 pub(super) proxies: Vec<Proxy>,
146}
147
148impl RpcConn {
149 pub fn open_stream_as_object(
162 &self,
163 on_object: Option<&ObjectId>,
164 target: (&str, u16),
165 isolation: &str,
166 ) -> Result<(ObjectId, TcpStream), StreamError> {
167 let on_object = self.resolve_on_object(on_object)?;
168 let new_stream_request =
169 Request::new(on_object.clone(), "arti:new_oneshot_client", NoParams {});
170 let stream_id = self
171 .execute_internal::<SingleIdReply>(&new_stream_request.encode()?)?
172 .map_err(StreamError::NewStreamRejected)?
173 .id;
174
175 match self.open_stream(Some(&stream_id), target, isolation) {
176 Ok(tcp_stream) => Ok((stream_id, tcp_stream)),
177 Err(e) => {
178 if let Err(_inner) = self.release_obj(stream_id) {
179 }
181 Err(e)
182 }
183 }
184 }
185
186 pub fn open_stream(
198 &self,
199 on_object: Option<&ObjectId>,
200 (hostname, port): (&str, u16),
201 isolation: &str,
202 ) -> Result<TcpStream, StreamError> {
203 let on_object = self.resolve_on_object(on_object)?;
204 let socks_proxy_addr = self.lookup_socks_proxy_addr()?;
205 let mut stream = TcpStream::connect(socks_proxy_addr)?;
206
207 let username = format!("<torS0X>1{}", on_object.as_ref());
210 let password = isolation;
211 negotiate_socks(&mut stream, hostname, port, &username, password)?;
212
213 Ok(stream)
214 }
215
216 fn lookup_socks_proxy_addr(&self) -> Result<SocketAddr, StreamError> {
221 let session_id = self.session_id_required()?.clone();
222
223 let proxy_info_request: Request<NoParams> =
224 Request::new(session_id, "arti:get_rpc_proxy_info", NoParams {});
225 let cmd = proxy_info_request.encode()?;
226 let proxy_info = match self.execute_internal::<ProxyInfoReply>(&cmd)? {
227 Ok(info) => info,
228 Err(response) => {
229 if response.decode().code() == RpcErrorCode::OBJECT_ERROR {
230 return Err(StreamError::NoSession);
233 } else {
234 return Err(response.internal_error(&cmd).into());
235 }
236 }
237 };
238 let socks_proxy_addr = proxy_info.find_socks_addr().ok_or(StreamError::NoProxy)?;
239
240 Ok(socks_proxy_addr)
241 }
242
243 fn session_id_required(&self) -> Result<&ObjectId, StreamError> {
245 self.session().ok_or(StreamError::NotAuthenticated)
246 }
247
248 fn resolve_on_object(&self, on_object: Option<&ObjectId>) -> Result<ObjectId, StreamError> {
250 Ok(match on_object {
251 Some(obj) => obj.clone(),
252 None => self.session_id_required()?.clone(),
253 })
254 }
255}
256
257fn negotiate_socks(
263 stream: &mut TcpStream,
264 hostname: &str,
265 port: u16,
266 username: &str,
267 password: &str,
268) -> Result<(), StreamError> {
269 use StreamError as E;
270 use tor_socksproto::{
271 Handshake as _, SocksAddr, SocksAuth, SocksClientHandshake, SocksCmd, SocksHostname,
272 SocksRequest, SocksStatus, SocksVersion,
273 };
274
275 let request = SocksRequest::new(
276 SocksVersion::V5,
277 SocksCmd::CONNECT,
278 SocksAddr::Hostname(SocksHostname::try_from(hostname.to_owned()).map_err(E::SocksRequest)?),
279 port,
280 SocksAuth::Username(
281 username.to_owned().into_bytes(),
282 password.to_owned().into_bytes(),
283 ),
284 )
285 .map_err(E::SocksRequest)?;
286
287 let mut buf = tor_socksproto::Buffer::new_precise();
288 let mut state = SocksClientHandshake::new(request);
289 let reply = loop {
290 use tor_socksproto::NextStep as NS;
291 match state.step(&mut buf).map_err(E::SocksProtocol)? {
292 NS::Recv(mut recv) => {
293 let n = stream.read(recv.buf())?;
294 recv.note_received(n).map_err(E::SocksProtocol)?;
295 }
296 NS::Send(send) => stream.write_all(&send)?,
297 NS::Finished(fin) => {
298 break fin
299 .into_output()
300 .map_err(|bug| E::Internal(bug.report().to_string()))?;
301 }
302 }
303 };
304
305 let status = reply.status();
306
307 if status == SocksStatus::SUCCEEDED {
308 Ok(())
309 } else {
310 Err(StreamError::SocksError(status))
311 }
312}
313
314#[cfg(test)]
315mod test {
316 #![allow(clippy::bool_assert_comparison)]
318 #![allow(clippy::clone_on_copy)]
319 #![allow(clippy::dbg_macro)]
320 #![allow(clippy::mixed_attributes_style)]
321 #![allow(clippy::print_stderr)]
322 #![allow(clippy::print_stdout)]
323 #![allow(clippy::single_char_pattern)]
324 #![allow(clippy::unwrap_used)]
325 #![allow(clippy::unchecked_time_subtraction)]
326 #![allow(clippy::useless_vec)]
327 #![allow(clippy::needless_pass_by_value)]
328 use super::*;
331
332 #[test]
333 fn unexpected_proxies() {
334 let p: ProxyInfoReply = serde_json::from_str(
335 r#"
336 { "proxies" : [ {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}} ] }
337 "#,
338 )
339 .unwrap();
340 assert_eq!(p.proxies.len(), 1);
341 match p.proxies[0].listener {
342 ProxyListener::Socks5 {
343 tcp_address: address,
344 } => {
345 assert_eq!(address.unwrap(), "127.0.0.1:9090".parse().unwrap());
346 }
347 ref other => panic!("{:?}", other),
348 };
349
350 let p: ProxyInfoReply = serde_json::from_str(
351 r#"
352 { "proxies" : [
353 {"listener" : {"hypothetical" : {"tzitzel" : "buttered" }}},
354 {"listener" : {"socks5" : {"unix_path" : "/home/username/.local/PROXY"}}},
355 {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}},
356 {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9999" }}}
357 ] }
358 "#,
359 )
360 .unwrap();
361 assert_eq!(
362 p.find_socks_addr().unwrap(),
363 "127.0.0.1:9090".parse().unwrap()
364 );
365 }
366}