tor_hsclient/state.rs
1//! Implement a cache for onion descriptors and the facility to remember a bit
2//! about onion service history.
3
4use std::fmt::Debug;
5use std::mem;
6use std::panic::AssertUnwindSafe;
7use std::sync::{Arc, Mutex, MutexGuard};
8use std::time::{Duration, Instant};
9
10use futures::FutureExt as _;
11use futures::task::{SpawnError, SpawnExt as _};
12
13use async_trait::async_trait;
14use educe::Educe;
15use either::Either::{self, *};
16use postage::stream::Stream as _;
17use tracing::{debug, error, instrument, trace};
18
19use safelog::DisplayRedacted as _;
20use tor_basic_utils::define_accessor_trait;
21use tor_circmgr::isolation::Isolation;
22use tor_error::{Bug, ErrorReport as _, debug_report, error_report, internal};
23use tor_hscrypto::pk::HsId;
24use tor_netdir::NetDir;
25use tor_rtcompat::Runtime;
26
27use crate::isol_map;
28use crate::{ConnError, HsClientConnector, HsClientSecretKeys};
29
30slotmap_careful::new_key_type! {
31 struct TableIndex;
32}
33
34/// Configuration, currently just some retry parameters
35#[derive(Default, Debug)]
36// This is not really public.
37// It has to be `pub` because it appears in one of the methods in `MockableConnectorData`.
38// That has to be because that trait is a bound on a parameter for `HsClientConnector`.
39// `Config` is not re-exported. (This is isomorphic to the trait sealing pattern.)
40//
41// This means that this struct cannot live in the crate root, so we put it here.
42pub struct Config {
43 /// Retry parameters
44 pub(crate) retry: tor_circmgr::CircuitTiming,
45}
46
47define_accessor_trait! {
48 /// Configuration for an HS client connector
49 ///
50 /// If the HS client connector gains new configurabilities, this trait will gain additional
51 /// supertraits, as an API break.
52 ///
53 /// Prefer to use `TorClientConfig`, which will always implement this trait.
54 //
55 // This arrangement is very like that for `CircMgrConfig`.
56 pub trait HsClientConnectorConfig {
57 circuit_timing: tor_circmgr::CircuitTiming,
58 }
59}
60
61/// Number of times we're willing to iterate round the state machine loop
62///
63/// **Not** the number of retries of failed descriptor downloads, circuits, etc.
64///
65/// The state machine loop is a condition variable loop.
66/// It repeatedly transforms the [`ServiceState`] to try to get to `Open`,
67/// converting stale data to `Closed` and `Closed` to `Working`, and so on.
68/// This ought only to go forwards so in principle we could use an infinite loop.
69/// But if we have a logic error, we want to crash eventually.
70/// The `rechecks` counter is for detecting such a situation.
71///
72/// This is fairly arbitrary, but we shouldn't get anywhere near it.
73///
74/// Note that this is **not** a number of operational retries
75/// of fallible retriable operations.
76/// Such retries are handled in [`connect.rs`](crate::connect).
77const MAX_RECHECKS: u32 = 10;
78
79/// C Tor `MaxCircuitDirtiness`
80///
81/// As per
82/// <https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914433>
83///
84/// And C Tor's `tor(1)`, which says:
85///
86/// > MaxCircuitDirtiness NUM
87/// >
88/// > Feel free to reuse a circuit that was first used at most NUM
89/// > seconds ago, but never attach a new stream to a circuit that is
90/// > too old. For hidden services, this applies to the last time a
91/// > circuit was used, not the first. Circuits with streams
92/// > constructed with SOCKS authentication via SocksPorts that have
93/// > KeepAliveIsolateSOCKSAuth also remain alive for
94/// > MaxCircuitDirtiness seconds after carrying the last such
95/// > stream. (Default: 10 minutes)
96///
97/// However, we're not entirely sure this is the right behaviour.
98/// See <https://gitlab.torproject.org/tpo/core/arti/-/issues/916>
99///
100// TODO SPEC: Explain C Tor `MaxCircuitDirtiness` behaviour
101//
102// TODO HS CFG: This should be configurable somehow
103const RETAIN_CIRCUIT_AFTER_LAST_USE: Duration = Duration::from_secs(10 * 60);
104
105/// How long to retain cached data about a hidden service
106///
107/// This is simply to reclaim space, not for correctness.
108/// So we only check this during housekeeping, not operation.
109///
110/// The starting point for this interval is the last time we used the data,
111/// or a circuit derived from it.
112///
113/// Note that this is a *maximum* for the length of time we will retain a descriptor;
114/// HS descriptors' lifetimes (as declared in the descriptor) *are* honoured;
115/// but that's done by the code in `connect.rs`, not here.
116///
117/// We're not sure this is the right value.
118/// See <https://gitlab.torproject.org/tpo/core/arti/-/issues/916>
119//
120// TODO SPEC: State how long IPT and descriptor data should be retained after use
121//
122// TODO HS CFG: Perhaps this should be configurable somehow?
123const RETAIN_DATA_AFTER_LAST_USE: Duration = Duration::from_secs(48 * 3600 /*hours*/);
124
125/// Hidden services;, our connections to them, and history of connections, etc.
126///
127/// Table containing state of our ideas about services.
128/// Data structure is keyed (indexed) by:
129/// * `HsId`, hidden service identity
130/// * any secret keys we are to use
131/// * circuit isolation
132///
133/// We treat different values for any of the above as completely independent,
134/// except that we try isolation joining (narrowing) if everything else matches.
135///
136/// In other words,
137/// * Two HS connection requests cannot share state and effort
138/// (descriptor downloads, descriptors, intro pt history)
139/// unless the restricted discovery keys to be used are the same.
140/// * This criterion is checked before looking at isolations,
141/// which may further restrict sharing:
142/// Two HS connection requests will only share state subject to isolations.
143///
144/// Here "state and effort" includes underlying circuits such as hsdir circuits,
145/// since each HS connection state will use `launch_specific_isolated` for those.
146#[derive(Default, Debug)]
147pub(crate) struct Services<D: MockableConnectorData> {
148 /// The actual records of our connections/attempts for each service, as separated
149 records: isol_map::MultikeyIsolatedMap<TableIndex, HsId, HsClientSecretKeys, ServiceState<D>>,
150
151 /// Configuration
152 ///
153 /// `Arc` so that it can be shared with individual hs connector tasks
154 config: Arc<Config>,
155}
156
157/// Entry in the 2nd-level lookup array
158#[allow(dead_code)] // This alias is here for documentation if nothing else
159type ServiceRecord<D> = isol_map::Record<HsClientSecretKeys, ServiceState<D>>;
160
161/// Value in the `Services` data structure
162///
163/// State and history of of our connections, including connection to any connection task.
164///
165/// `last_used` is used to expire data eventually.
166//
167// TODO unify this with channels and circuits. See arti#778.
168#[derive(Educe)]
169#[educe(Debug)]
170enum ServiceState<D: MockableConnectorData> {
171 /// We don't have a circuit
172 Closed {
173 /// The state
174 data: D,
175 /// Last time we touched this, including reuse
176 last_used: Instant,
177 },
178 /// We have an open circuit, which we can (hopefully) just use
179 Open {
180 /// The state
181 data: D,
182 /// The circuit
183 #[educe(Debug(ignore))]
184 tunnel: Arc<D::DataTunnel>,
185 /// Last time we touched this, including reuse
186 ///
187 /// This is set when we created the circuit, and updated when we
188 /// hand out this circuit again in response to a new request.
189 ///
190 /// We believe this mirrors C Tor behaviour;
191 /// see [`RETAIN_CIRCUIT_AFTER_LAST_USE`].
192 last_used: Instant,
193 /// We have a task that will close the circuit when required
194 ///
195 /// This field serves to require construction sites of Open
196 /// to demonstrate that there *is* an expiry task.
197 /// In the future, it may also serve to cancel old expiry tasks.
198 circuit_expiry_task: CircuitExpiryTask,
199 },
200 /// We have a task trying to find the service and establish the circuit
201 ///
202 /// CachedData is owned by the task.
203 Working {
204 /// Signals instances of `get_or_launch_connection` when the task completes
205 barrier_recv: postage::barrier::Receiver,
206 /// Where the task will store the error.
207 ///
208 /// Lock hierarchy: this lock is "inside" the big lock on `Services`.
209 error: Arc<Mutex<Option<ConnError>>>,
210 },
211 /// Dummy value for use with temporary mem replace
212 Dummy,
213}
214
215impl<D: MockableConnectorData> ServiceState<D> {
216 /// Make a new (blank) `ServiceState::Closed`
217 fn blank(runtime: &impl Runtime) -> Self {
218 ServiceState::Closed {
219 data: D::default(),
220 last_used: runtime.now(),
221 }
222 }
223}
224
225/// "Continuation" return type from `obtain_circuit_or_continuation_info`
226type Continuation = (Arc<Mutex<Option<ConnError>>>, postage::barrier::Receiver);
227
228/// Represents a task which is waiting to see when the circuit needs to be expired
229///
230/// TODO: Replace this with a task handle that cancels the task when dropped.
231/// Until then, if the circuit is closed before then, the expiry task will
232/// uselessly wake up some time later.
233#[derive(Debug)] // Not Clone
234struct CircuitExpiryTask {}
235// impl Drop already, partly to allow explicit drop(CircuitExpiryTask) without clippy complaint
236impl Drop for CircuitExpiryTask {
237 fn drop(&mut self) {}
238}
239
240/// Obtain a circuit from the `Services` table, or return a continuation
241///
242/// This is the workhorse function for `get_or_launch_connection`.
243///
244/// `get_or_launch_connection`, together with `obtain_circuit_or_continuation_info`,
245/// form a condition variable loop:
246///
247/// We check to see if we have a circuit. If so, we return it.
248/// Otherwise, we make sure that a circuit is being constructed,
249/// and then go into a condvar wait;
250/// we'll be signaled when the construction completes.
251///
252/// So the connection task we spawn does not return the circuit, or error,
253/// via an inter-task stream.
254/// It stores it in the data structure and wakes up all the client tasks.
255/// (This means there is only one success path for the client task code.)
256///
257/// There are some wrinkles:
258///
259/// ### Existence of this as a separate function
260///
261/// The usual structure for a condition variable loop would be something like this:
262///
263/// ```rust,ignore
264/// loop {
265/// test state and maybe break;
266/// cv.wait(guard).await; // consumes guard, unlocking after enqueueing us as a waiter
267/// guard = lock();
268/// }
269/// ```
270///
271/// However, Rust does not currently understand that the mutex is not
272/// actually a captured variable held across an await point,
273/// when the variable is consumed before the await, and re-stored afterwards.
274/// As a result, the async future becomes erroneously `!Send`:
275/// <https://github.com/rust-lang/rust/issues/104883>.
276/// We want the unstable feature `-Zdrop-tracking`:
277/// <https://github.com/rust-lang/rust/issues/97331>.
278///
279/// Instead, to convince the compiler, we must use a scope-based drop of the mutex guard.
280/// That means converting the "test state and maybe break" part into a sub-function.
281/// That's what this function is.
282///
283/// It returns `Right` if the loop should be exited, returning the circuit to the caller.
284/// It returns `Left` if the loop needs to do a condition variable wait.
285///
286/// ### We're using a barrier as a condition variable
287///
288/// We want to be signaled when the task exits. Indeed, *only* when it exits.
289/// This functionality is most conveniently in a `postage::barrier`.
290///
291/// ### Nested loops
292///
293/// Sometimes we want to go round again *without* unlocking.
294/// Sometimes we must unlock and wait and relock.
295///
296/// The drop tracking workaround (see above) means we have to do these two
297/// in separate scopes.
298/// So there are two nested loops: one here, and one in `get_or_launch_connection`.
299/// They both use the same backstop rechecks counter.
300fn obtain_circuit_or_continuation_info<D: MockableConnectorData>(
301 connector: &HsClientConnector<impl Runtime, D>,
302 netdir: &Arc<NetDir>,
303 hsid: &HsId,
304 secret_keys: &HsClientSecretKeys,
305 table_index: TableIndex,
306 rechecks: &mut impl Iterator,
307 mut guard: MutexGuard<'_, Services<D>>,
308) -> Result<Either<Continuation, Arc<D::DataTunnel>>, ConnError> {
309 let blank_state = || ServiceState::blank(&connector.runtime);
310
311 for _recheck in rechecks {
312 let record = guard
313 .records
314 .by_index_mut(table_index)
315 .ok_or_else(|| internal!("guard table entry vanished!"))?;
316 let state = &mut **record;
317
318 trace!("HS conn state: {state:?}");
319
320 let (data, barrier_send) = match state {
321 ServiceState::Open {
322 data: _,
323 tunnel,
324 last_used,
325 circuit_expiry_task: _,
326 } => {
327 let now = connector.runtime.now();
328 if !D::tunnel_is_ok(tunnel) {
329 // Well that's no good, we need a fresh one, but keep the data
330 let data = match mem::replace(state, ServiceState::Dummy) {
331 ServiceState::Open {
332 data,
333 last_used: _,
334 tunnel: _,
335 circuit_expiry_task: _,
336 } => data,
337 _ => panic!("state changed between matches"),
338 };
339 *state = ServiceState::Closed {
340 data,
341 last_used: now,
342 };
343 continue;
344 }
345 *last_used = now;
346 // No need to tell expiry task about revised expiry time;
347 // it will see the new last_used when it wakes up at the old expiry time.
348
349 return Ok::<_, ConnError>(Right(tunnel.clone()));
350 }
351 ServiceState::Working {
352 barrier_recv,
353 error,
354 } => {
355 if !matches!(
356 barrier_recv.try_recv(),
357 Err(postage::stream::TryRecvError::Pending)
358 ) {
359 // This information is stale; the task no longer exists.
360 // We want information from a fresh task.
361 *state = blank_state();
362 continue;
363 }
364 let barrier_recv = barrier_recv.clone();
365
366 // This clone of the error field Arc<Mutex<..>> allows us to collect errors
367 // which happened due to the currently-running task, which we have just
368 // found exists. Ie, it will see errors that occurred after we entered
369 // `get_or_launch`. Stale errors, from previous tasks, were cleared above.
370 let error = error.clone();
371
372 // Wait for the task to complete (at which point it drops the barrier)
373 return Ok(Left((error, barrier_recv)));
374 }
375 ServiceState::Closed { .. } => {
376 let (barrier_send, barrier_recv) = postage::barrier::channel();
377 let data = match mem::replace(
378 state,
379 ServiceState::Working {
380 barrier_recv,
381 error: Arc::new(Mutex::new(None)),
382 },
383 ) {
384 ServiceState::Closed { data, .. } => data,
385 _ => panic!("state changed between matches"),
386 };
387 (data, barrier_send)
388 }
389 ServiceState::Dummy => {
390 *state = blank_state();
391 return Err(internal!("HS connector found dummy state").into());
392 }
393 };
394
395 // Make a connection
396 let runtime = &connector.runtime;
397 let connector = (*connector).clone();
398 let config = guard.config.clone();
399 let netdir = netdir.clone();
400 let secret_keys = secret_keys.clone();
401 let hsid = *hsid;
402 let connect_future = async move {
403 let mut data = data;
404
405 let got = AssertUnwindSafe(D::connect(
406 &connector,
407 netdir,
408 config,
409 hsid,
410 &mut data,
411 secret_keys,
412 ))
413 .catch_unwind()
414 .await
415 .unwrap_or_else(|_| {
416 data = D::default();
417 Err(internal!("hidden service connector task panicked!").into())
418 });
419 let now = connector.runtime.now();
420 let last_used = now;
421
422 let got = got.and_then(|circuit| {
423 let circuit_expiry_task = ServiceState::spawn_circuit_expiry_task(
424 &connector,
425 hsid,
426 table_index,
427 last_used,
428 now,
429 )
430 .map_err(|cause| ConnError::Spawn {
431 spawning: "circuit expiry task",
432 cause: cause.into(),
433 })?;
434 Ok((circuit, circuit_expiry_task))
435 });
436
437 let got_error = got.as_ref().map(|_| ()).map_err(Clone::clone);
438
439 // block for handling inability to store
440 let stored = async {
441 let mut guard = connector.services()?;
442 let record = guard
443 .records
444 .by_index_mut(table_index)
445 .ok_or_else(|| internal!("HS table entry removed while task running"))?;
446 // Always match this, so we check what we're overwriting
447 let state = &mut **record;
448 let error_store = match state {
449 ServiceState::Working { error, .. } => error,
450 _ => return Err(internal!("HS task found state other than Working")),
451 };
452
453 match got {
454 Ok((tunnel, circuit_expiry_task)) => {
455 *state = ServiceState::Open {
456 data,
457 tunnel: Arc::new(tunnel),
458 last_used,
459 circuit_expiry_task,
460 }
461 }
462 Err(error) => {
463 let mut error_store = error_store
464 .lock()
465 .map_err(|_| internal!("Working error poisoned, cannot store error"))?;
466 *error_store = Some(error);
467 }
468 };
469
470 Ok(())
471 }
472 .await;
473
474 match (got_error, stored) {
475 (Ok::<(), ConnError>(()), Ok::<(), Bug>(())) => {}
476 (Err(got_error), Ok(())) => {
477 debug_report!(
478 got_error,
479 "HS connection failure for {}",
480 hsid.display_redacted()
481 );
482 }
483 (Ok(()), Err(bug)) => {
484 error_report!(
485 bug,
486 "internal error storing built HS circuit for {}",
487 hsid.display_redacted()
488 );
489 }
490 (Err(got_error), Err(bug)) => {
491 // We're reporting two errors, so we'll construct the event
492 // manually.
493 error!(
494 "internal error storing HS connection error for {}: {}; {}",
495 hsid.display_redacted(),
496 got_error.report(),
497 bug.report(),
498 );
499 }
500 };
501 drop(barrier_send);
502 };
503 runtime
504 .spawn_obj(Box::new(connect_future).into())
505 .map_err(|cause| ConnError::Spawn {
506 spawning: "connection task",
507 cause: cause.into(),
508 })?;
509 }
510
511 Err(internal!("HS connector state management malfunction (exceeded MAX_RECHECKS").into())
512}
513
514impl<D: MockableConnectorData> Services<D> {
515 /// Create a new empty `Services`
516 pub(crate) fn new(config: Config) -> Self {
517 Services {
518 records: Default::default(),
519 config: Arc::new(config),
520 }
521 }
522
523 /// Connect to a hidden service
524 // We *do* drop guard. There is *one* await point, just after drop(guard).
525 #[instrument(skip_all, level = "trace")]
526 pub(crate) async fn get_or_launch_connection(
527 connector: &HsClientConnector<impl Runtime, D>,
528 netdir: &Arc<NetDir>,
529 hs_id: HsId,
530 isolation: Box<dyn Isolation>,
531 secret_keys: HsClientSecretKeys,
532 ) -> Result<Arc<D::DataTunnel>, ConnError> {
533 let blank_state = || ServiceState::blank(&connector.runtime);
534
535 let mut rechecks = 0..MAX_RECHECKS;
536
537 let mut obtain = |table_index, guard| {
538 obtain_circuit_or_continuation_info(
539 connector,
540 netdir,
541 &hs_id,
542 &secret_keys,
543 table_index,
544 &mut rechecks,
545 guard,
546 )
547 };
548
549 let mut got;
550 let table_index;
551 {
552 let mut guard = connector.services()?;
553 let services = &mut *guard;
554
555 trace!("HS conn get_or_launch: {hs_id:?} {isolation:?} {secret_keys:?}");
556 //trace!("HS conn services: {services:?}");
557
558 table_index =
559 services
560 .records
561 .index_or_insert_with(&hs_id, &secret_keys, isolation, blank_state);
562
563 let guard = guard;
564 got = obtain(table_index, guard);
565 }
566 loop {
567 // The parts of this loop which run after a `Left` is returned
568 // logically belong in the case in `obtain_circuit_or_continuation_info`
569 // for `ServiceState::Working`, where that function decides we need to wait.
570 // This code has to be out here to help the compiler's drop tracking.
571 {
572 // Block to scope the acquisition of `error`, a guard
573 // for the mutex-protected error field in the state,
574 // and, for neatness, barrier_recv.
575
576 let (error, mut barrier_recv) = match got? {
577 Right(ret) => return Ok(ret),
578 Left(continuation) => continuation,
579 };
580
581 barrier_recv.recv().await;
582
583 let error = error
584 .lock()
585 .map_err(|_| internal!("Working error poisoned"))?;
586 if let Some(error) = &*error {
587 return Err(error.clone());
588 }
589 }
590
591 let guard = connector.services()?;
592
593 got = obtain(table_index, guard);
594 }
595 }
596
597 /// Perform housekeeping - delete data we aren't interested in any more
598 pub(crate) fn run_housekeeping(&mut self, now: Instant) {
599 self.expire_old_data(now);
600 }
601
602 /// Delete data we aren't interested in any more
603 fn expire_old_data(&mut self, now: Instant) {
604 self.records
605 .retain(|hsid, record, _table_index| match &**record {
606 ServiceState::Closed { data: _, last_used } => {
607 let Some(expiry_time) = last_used.checked_add(RETAIN_DATA_AFTER_LAST_USE)
608 else {
609 return false;
610 };
611 now <= expiry_time
612 }
613 ServiceState::Open { .. } | ServiceState::Working { .. } => true,
614 ServiceState::Dummy => {
615 error!(
616 "bug: found dummy data during HS housekeeping, for {}",
617 hsid.display_redacted()
618 );
619 false
620 }
621 });
622 }
623}
624
625impl<D: MockableConnectorData> ServiceState<D> {
626 /// Spawn a task that will drop our reference to the rendezvous circuit
627 /// at `table_index` when it has gone too long without any use.
628 ///
629 /// According to [`RETAIN_CIRCUIT_AFTER_LAST_USE`].
630 //
631 // As it happens, this function is always called with `last_used` equal to `now`,
632 // but we pass separate arguments for clarity.
633 fn spawn_circuit_expiry_task(
634 connector: &HsClientConnector<impl Runtime, D>,
635 hsid: HsId,
636 table_index: TableIndex,
637 last_used: Instant,
638 now: Instant,
639 ) -> Result<CircuitExpiryTask, SpawnError> {
640 /// Returns the duration until expiry, or `None` if it should expire now
641 fn calculate_expiry_wait(last_used: Instant, now: Instant) -> Option<Duration> {
642 let expiry = last_used
643 .checked_add(RETAIN_CIRCUIT_AFTER_LAST_USE)
644 .or_else(|| {
645 error!("bug: time overflow calculating HS circuit expiry, killing circuit!");
646 None
647 })?;
648 let wait = expiry.checked_duration_since(now).unwrap_or_default();
649 if wait == Duration::ZERO {
650 return None;
651 }
652 Some(wait)
653 }
654
655 let mut maybe_wait = calculate_expiry_wait(last_used, now);
656 let () = connector.runtime.spawn({
657 let connector = connector.clone();
658 async move {
659 // This loop is slightly odd. The wait ought naturally to be at the end,
660 // but that would mean a useless re-lock and re-check right after creation,
661 // or jumping into the middle of the loop.
662 loop {
663 if let Some(yes_wait) = maybe_wait {
664 connector.runtime.sleep(yes_wait).await;
665 }
666 // If it's None, we can't rely on that to say we should expire it,
667 // since that information crossed a time when we didn't hold the lock.
668
669 let Ok(mut guard) = connector.services() else {
670 break;
671 };
672 let Some(record) = guard.records.by_index_mut(table_index) else {
673 break;
674 };
675 let state = &mut **record;
676 let last_used = match state {
677 ServiceState::Closed { .. } => break,
678 ServiceState::Open { last_used, .. } => *last_used,
679 ServiceState::Working { .. } => break, // someone else will respawn
680 ServiceState::Dummy => break, // someone else will (report and) fix
681 };
682 maybe_wait = calculate_expiry_wait(last_used, connector.runtime.now());
683 if maybe_wait.is_none() {
684 match mem::replace(state, ServiceState::Dummy) {
685 ServiceState::Open {
686 data,
687 tunnel: circuit,
688 last_used,
689 circuit_expiry_task,
690 } => {
691 debug!("HS connection expires: {hsid:?}");
692 drop(circuit);
693 drop(circuit_expiry_task); // that's us
694 *state = ServiceState::Closed { data, last_used };
695 break;
696 }
697 _ => panic!("state now {state:?} even though we just saw it Open"),
698 }
699 }
700 }
701 }
702 })?;
703 Ok(CircuitExpiryTask {})
704 }
705}
706
707/// Mocking for actual HS connection work, to let us test the `Services` state machine
708//
709// Does *not* mock circmgr, chanmgr, etc. - those won't be used by the tests, since our
710// `connect` won't call them. But mocking them pollutes many types with `R` and is
711// generally tiresome. So let's not. Instead the tests can make dummy ones.
712//
713// This trait is actually crate-private, since it isn't re-exported, but it must
714// be `pub` because it appears as a default for a type parameter in HsClientConnector.
715#[async_trait]
716pub trait MockableConnectorData: Default + Debug + Send + Sync + 'static {
717 /// Client circuit
718 type DataTunnel: Sync + Send + 'static;
719
720 /// Mock state
721 type MockGlobalState: Clone + Sync + Send + 'static;
722
723 /// Connect
724 async fn connect<R: Runtime>(
725 connector: &HsClientConnector<R, Self>,
726 netdir: Arc<NetDir>,
727 config: Arc<Config>,
728 hsid: HsId,
729 data: &mut Self,
730 secret_keys: HsClientSecretKeys,
731 ) -> Result<Self::DataTunnel, ConnError>;
732
733 /// Is circuit OK? Ie, not `.is_closing()`.
734 fn tunnel_is_ok(tunnel: &Self::DataTunnel) -> bool;
735}
736
737#[cfg(test)]
738pub(crate) mod test {
739 // @@ begin test lint list maintained by maint/add_warning @@
740 #![allow(clippy::bool_assert_comparison)]
741 #![allow(clippy::clone_on_copy)]
742 #![allow(clippy::dbg_macro)]
743 #![allow(clippy::mixed_attributes_style)]
744 #![allow(clippy::print_stderr)]
745 #![allow(clippy::print_stdout)]
746 #![allow(clippy::single_char_pattern)]
747 #![allow(clippy::unwrap_used)]
748 #![allow(clippy::unchecked_duration_subtraction)]
749 #![allow(clippy::useless_vec)]
750 #![allow(clippy::needless_pass_by_value)]
751 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
752 use super::*;
753 use crate::*;
754 use futures::{SinkExt, poll};
755 use std::fmt;
756 use std::task::Poll::{self, *};
757 use tokio::pin;
758 use tokio_crate as tokio;
759 use tor_memquota::ArcMemoryQuotaTrackerExt as _;
760 use tor_proto::memquota::ToplevelAccount;
761 use tor_rtcompat::{SleepProvider, test_with_one_runtime};
762 use tor_rtmock::MockRuntime;
763 use tracing_test::traced_test;
764
765 use ConnError as E;
766
767 #[derive(Debug, Default)]
768 struct MockData {
769 connect_called: usize,
770 }
771
772 /// Type indicating what our `connect()` should return; it always makes a fresh MockCirc
773 type MockGive = Poll<Result<(), E>>;
774
775 #[derive(Debug, Clone)]
776 struct MockGlobalState {
777 // things will appear here when we have more sophisticated tests
778 give: postage::watch::Receiver<MockGive>,
779 }
780
781 #[derive(Clone, Educe)]
782 #[educe(Debug)]
783 struct MockTunnel {
784 #[educe(Debug(method = "debug_arc_mutex"))]
785 ok: Arc<Mutex<bool>>,
786 connect_called: usize,
787 }
788
789 fn debug_arc_mutex(val: &Arc<Mutex<impl Debug>>, f: &mut fmt::Formatter) -> fmt::Result {
790 write!(f, "@{:?}", Arc::as_ptr(val))?;
791 let guard = val.lock();
792 let guard = guard.or_else(|g| {
793 write!(f, ",POISON")?;
794 Ok::<_, fmt::Error>(g.into_inner())
795 })?;
796 write!(f, " ")?;
797 Debug::fmt(&*guard, f)
798 }
799
800 impl PartialEq for MockTunnel {
801 fn eq(&self, other: &MockTunnel) -> bool {
802 Arc::ptr_eq(&self.ok, &other.ok)
803 }
804 }
805
806 impl MockTunnel {
807 fn new(connect_called: usize) -> Self {
808 let ok = Arc::new(Mutex::new(true));
809 MockTunnel { ok, connect_called }
810 }
811 }
812
813 #[async_trait]
814 impl MockableConnectorData for MockData {
815 type DataTunnel = MockTunnel;
816 type MockGlobalState = MockGlobalState;
817
818 async fn connect<R: Runtime>(
819 connector: &HsClientConnector<R, MockData>,
820 _netdir: Arc<NetDir>,
821 _config: Arc<Config>,
822 _hsid: HsId,
823 data: &mut MockData,
824 _secret_keys: HsClientSecretKeys,
825 ) -> Result<Self::DataTunnel, E> {
826 data.connect_called += 1;
827 let make = {
828 let connect_called = data.connect_called;
829 move |()| MockTunnel::new(connect_called)
830 };
831 let mut give = connector.mock_for_state.give.clone();
832 if let Ready(ret) = &*give.borrow() {
833 return ret.clone().map(make);
834 }
835 loop {
836 match give.recv().await.expect("EOF on mock_global_state stream") {
837 Pending => {}
838 Ready(ret) => return ret.map(make),
839 }
840 }
841 }
842
843 fn tunnel_is_ok(circuit: &Self::DataTunnel) -> bool {
844 *circuit.ok.lock().unwrap()
845 }
846 }
847
848 /// Makes a non-empty `HsClientSecretKeys`, containing (somehow) `kk`
849 fn mk_keys(kk: u8) -> HsClientSecretKeys {
850 let mut ss = [0_u8; 32];
851 ss[0] = kk;
852 let keypair = tor_llcrypto::pk::ed25519::Keypair::from_bytes(&ss);
853 let mut b = HsClientSecretKeysBuilder::default();
854 #[allow(deprecated)]
855 b.ks_hsc_intro_auth(keypair.into());
856 b.build().unwrap()
857 }
858
859 fn mk_hsconn<R: Runtime>(
860 runtime: R,
861 ) -> (
862 HsClientConnector<R, MockData>,
863 HsClientSecretKeys,
864 postage::watch::Sender<MockGive>,
865 ) {
866 let chanmgr = tor_chanmgr::ChanMgr::new(
867 runtime.clone(),
868 &Default::default(),
869 tor_chanmgr::Dormancy::Dormant,
870 &Default::default(),
871 ToplevelAccount::new_noop(),
872 None,
873 );
874 let guardmgr = tor_guardmgr::GuardMgr::new(
875 runtime.clone(),
876 tor_persist::TestingStateMgr::new(),
877 &tor_guardmgr::TestConfig::default(),
878 )
879 .unwrap();
880
881 let circmgr = Arc::new(
882 tor_circmgr::CircMgr::new(
883 &tor_circmgr::TestConfig::default(),
884 tor_persist::TestingStateMgr::new(),
885 &runtime,
886 Arc::new(chanmgr),
887 &guardmgr,
888 )
889 .unwrap(),
890 );
891 let circpool = Arc::new(HsCircPool::new(&circmgr));
892 let (give_send, give) = postage::watch::channel_with(Ready(Ok(())));
893 let mock_for_state = MockGlobalState { give };
894 #[allow(clippy::let_and_return)] // we'll probably add more in this function
895 let hscc = HsClientConnector {
896 runtime,
897 circpool,
898 services: Default::default(),
899 mock_for_state,
900 };
901 let keys = HsClientSecretKeysBuilder::default().build().unwrap();
902 (hscc, keys, give_send)
903 }
904
905 #[allow(clippy::unnecessary_wraps)]
906 fn mk_isol(s: &str) -> Option<NarrowableIsolation> {
907 Some(NarrowableIsolation(s.into()))
908 }
909
910 async fn launch_one(
911 hsconn: &HsClientConnector<impl Runtime, MockData>,
912 id: u8,
913 secret_keys: &HsClientSecretKeys,
914 isolation: Option<NarrowableIsolation>,
915 ) -> Result<Arc<MockTunnel>, ConnError> {
916 let netdir = tor_netdir::testnet::construct_netdir()
917 .unwrap_if_sufficient()
918 .unwrap();
919 let netdir = Arc::new(netdir);
920
921 let hs_id = {
922 let mut hs_id = [0_u8; 32];
923 hs_id[0] = id;
924 hs_id.into()
925 };
926 #[allow(clippy::redundant_closure)] // srsly, that would be worse
927 let isolation = isolation.unwrap_or_default().into();
928 Services::get_or_launch_connection(hsconn, &netdir, hs_id, isolation, secret_keys.clone())
929 .await
930 }
931
932 #[derive(Default, Debug, Clone)]
933 // TODO move this to tor-circmgr under a test feature?
934 pub(crate) struct NarrowableIsolation(pub(crate) String);
935 impl tor_circmgr::isolation::IsolationHelper for NarrowableIsolation {
936 fn compatible_same_type(&self, other: &Self) -> bool {
937 self.join_same_type(other).is_some()
938 }
939 fn join_same_type(&self, other: &Self) -> Option<Self> {
940 Some(if self.0.starts_with(&other.0) {
941 self.clone()
942 } else if other.0.starts_with(&self.0) {
943 other.clone()
944 } else {
945 return None;
946 })
947 }
948 }
949
950 #[test]
951 #[traced_test]
952 fn simple() {
953 test_with_one_runtime!(|runtime| async {
954 let (hsconn, keys, _give_send) = mk_hsconn(runtime);
955
956 let circuit = launch_one(&hsconn, 0, &keys, None).await.unwrap();
957 eprintln!("{:?}", circuit);
958 });
959 }
960
961 #[test]
962 #[traced_test]
963 fn expiry() {
964 MockRuntime::test_with_various(|runtime| async move {
965 // This is the amount by which we adjust clock advances to make sure we
966 // hit more or less than a particular value, to avoid edge cases and
967 // cope with real time advancing too.
968 // This does *not* represent an actual delay to real test runs.
969 const TIMEOUT_SLOP: Duration = Duration::from_secs(10);
970
971 let (hsconn, keys, _give_send) = mk_hsconn(runtime.clone());
972
973 let advance = |duration| {
974 let hsconn = hsconn.clone();
975 let runtime = &runtime;
976 async move {
977 // let expiry task get going and choose its expiry (wakeup) time
978 runtime.progress_until_stalled().await;
979 // TODO: Make this use runtime.advance_by() when that's not very slow
980 runtime.mock_sleep().advance(duration);
981 // let expiry task run
982 runtime.progress_until_stalled().await;
983 hsconn.services().unwrap().run_housekeeping(runtime.now());
984 }
985 };
986
987 // make circuit1
988 let circuit1 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
989
990 // expire it
991 advance(RETAIN_CIRCUIT_AFTER_LAST_USE + TIMEOUT_SLOP).await;
992
993 // make circuit2 (a)
994 let circuit2a = launch_one(&hsconn, 0, &keys, None).await.unwrap();
995 assert_ne!(circuit1, circuit2a);
996
997 // nearly expire it, then reuse it
998 advance(RETAIN_CIRCUIT_AFTER_LAST_USE - TIMEOUT_SLOP).await;
999 let circuit2b = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1000 assert_eq!(circuit2a, circuit2b);
1001
1002 // nearly expire it again, then reuse it
1003 advance(RETAIN_CIRCUIT_AFTER_LAST_USE - TIMEOUT_SLOP).await;
1004 let circuit2c = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1005 assert_eq!(circuit2a, circuit2c);
1006
1007 // actually expire it
1008 advance(RETAIN_CIRCUIT_AFTER_LAST_USE + TIMEOUT_SLOP).await;
1009 let circuit3 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1010 assert_ne!(circuit2c, circuit3);
1011 assert_eq!(circuit3.connect_called, 3);
1012
1013 advance(RETAIN_DATA_AFTER_LAST_USE + Duration::from_secs(10)).await;
1014 let circuit4 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1015 assert_eq!(circuit4.connect_called, 1);
1016 });
1017 }
1018
1019 #[test]
1020 #[traced_test]
1021 fn coalesce() {
1022 test_with_one_runtime!(|runtime| async {
1023 let (hsconn, keys, mut give_send) = mk_hsconn(runtime);
1024
1025 give_send.send(Pending).await.unwrap();
1026
1027 let c1f = launch_one(&hsconn, 0, &keys, None);
1028 pin!(c1f);
1029 for _ in 0..10 {
1030 assert!(poll!(&mut c1f).is_pending());
1031 }
1032
1033 // c2f will find Working
1034 let c2f = launch_one(&hsconn, 0, &keys, None);
1035 pin!(c2f);
1036 for _ in 0..10 {
1037 assert!(poll!(&mut c1f).is_pending());
1038 assert!(poll!(&mut c2f).is_pending());
1039 }
1040
1041 give_send.send(Ready(Ok(()))).await.unwrap();
1042
1043 let c1 = c1f.await.unwrap();
1044 let c2 = c2f.await.unwrap();
1045 assert_eq!(c1, c2);
1046
1047 // c2 will find Open
1048 let c3 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1049 assert_eq!(c1, c3);
1050
1051 assert_ne!(c1, launch_one(&hsconn, 1, &keys, None).await.unwrap());
1052 assert_ne!(
1053 c1,
1054 launch_one(&hsconn, 0, &mk_keys(42), None).await.unwrap()
1055 );
1056
1057 let c_isol_1 = launch_one(&hsconn, 0, &keys, mk_isol("a")).await.unwrap();
1058 assert_eq!(c1, c_isol_1); // We can reuse, but now we've narrowed the isol
1059
1060 let c_isol_2 = launch_one(&hsconn, 0, &keys, mk_isol("b")).await.unwrap();
1061 assert_ne!(c1, c_isol_2);
1062 });
1063 }
1064}