arti-relay 0.42.0

Library for running a relay of the Tor network
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
//! Entry point of a Tor relay that is the [`TorRelay`] objects

use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};

use anyhow::Context;
use tokio::task::JoinSet;
use tor_proto::RelayChannelAuthMaterial;
use tracing::debug;
#[cfg(unix)]
use tracing::warn;

use fs_mistrust::Mistrust;
use tor_basic_utils::iter_join;
use tor_chanmgr::{ChanMgr, ChanMgrConfig, Dormancy};
use tor_config_path::CfgPathResolver;
use tor_dirmgr::DirMgrConfig;
use tor_keymgr::{ArtiNativeKeystore, KeyMgr, KeyMgrBuilder};
use tor_memquota::MemoryQuotaTracker;
use tor_netdir::params::NetParameters;
use tor_persist::state_dir::StateDirectory;
use tor_persist::{FsStateMgr, StateMgr};
use tor_proto::relay::CreateRequestHandler;
use tor_rtcompat::{NetStreamProvider, Runtime};

use crate::client::RelayClient;
use crate::config::TorRelayConfig;
use crate::tasks::channel::build_circ_net_params;
use crate::tasks::crypto::get_ntor_keys;

/// An initialized but unbootstrapped relay.
///
/// This intentionally does not have access to the runtime to prevent it from doing network io.
///
/// The idea is that we can build up the relay's components in an `InertTorRelay` without a runtime,
/// and then call `init()` on it and provide a runtime to turn it into a network-capable relay.
/// This gives us two advantages:
///
/// - We can initialize the internal data structures in the `InertTorRelay` (load the keystores,
///   configure memquota, etc), which leaves `TorRelay` to just "running" the relay (bootstrapping,
///   setting up listening sockets, etc). We don't need to combine the initialization and "running
///   the relay" all within the same object.
/// - We will likely want to share some of arti's key management subcommands in the future.
///   arti-client has an `InertTorClient` which is used so that arti subcommands can access the
///   keystore. If we do a similar thing here in arti-relay in the future, it might be nice to have
///   an `InertTorRelay` which has these internal data structures, but doesn't need a runtime or
///   have any networking capabilities.
///
/// Time will tell if this ends up being a bad design decision in practice, and we can always change
/// it later.
#[derive(Clone)]
pub(crate) struct InertTorRelay {
    /// The configuration options for the relay.
    config: TorRelayConfig,

    /// The configuration options for the client's directory manager.
    dirmgr_config: DirMgrConfig,

    /// Path resolver for expanding variables in [`CfgPath`](tor_config_path::CfgPath)s.
    #[expect(unused)] // TODO RELAY remove
    path_resolver: CfgPathResolver,

    /// State directory path.
    ///
    /// The [`StateDirectory`] stored in `state_dir` doesn't seem to have a way of getting the state
    /// directory path, so we need to store a copy of the path here.
    #[expect(unused)] // TODO RELAY remove
    state_path: PathBuf,

    /// Relay's state directory.
    #[expect(unused)] // TODO RELAY remove
    state_dir: StateDirectory,

    /// Location on disk where we store persistent data.
    state_mgr: FsStateMgr,

    /// Key manager holding all relay keys and certificates.
    keymgr: Arc<KeyMgr>,
}

impl InertTorRelay {
    /// Create a new Tor relay with the given configuration.
    pub(crate) fn new(
        config: TorRelayConfig,
        path_resolver: CfgPathResolver,
    ) -> anyhow::Result<Self> {
        let state_path = config.storage.state_dir(&path_resolver)?;
        let cache_path = config.storage.cache_dir(&path_resolver)?;

        let state_dir = StateDirectory::new(&state_path, config.storage.permissions())
            .context("Failed to create `StateDirectory`")?;
        let state_mgr =
            FsStateMgr::from_path_and_mistrust(&state_path, config.storage.permissions())
                .context("Failed to create `FsStateMgr`")?;

        // Try to take state ownership early, so we'll know if we have it.
        // Note that this `try_lock()` may return `Ok` even if we can't acquire the lock.
        // (At this point we don't yet care if we have it.)
        let _ignore_status = state_mgr
            .try_lock()
            .context("Failed to try locking the state manager")?;

        let keymgr = Self::create_keymgr(&state_path, config.storage.permissions())
            .context("Failed to create key manager")?;

        let dirmgr_config = DirMgrConfig {
            cache_dir: cache_path,
            cache_trust: config.storage.permissions().clone(),
            network: config.tor_network.clone(),
            schedule: Default::default(),
            tolerance: Default::default(),
            override_net_params: Default::default(),
            extensions: Default::default(),
        };

        Ok(Self {
            config,
            dirmgr_config,
            path_resolver,
            state_path,
            state_dir,
            state_mgr,
            keymgr,
        })
    }

    /// Connect the [`InertTorRelay`] to the Tor network.
    pub(crate) async fn init<R: Runtime>(self, runtime: R) -> anyhow::Result<TorRelay<R>> {
        // Attempt to generate any missing keys/cert from the KeyMgr.
        let auth_material = crate::tasks::crypto::try_generate_keys(&runtime, &self.keymgr)
            .context("Failed to generate keys")?;

        TorRelay::init(runtime, self, auth_material).await
    }

    /// Create the [key manager](KeyMgr).
    fn create_keymgr(state_path: &Path, mistrust: &Mistrust) -> anyhow::Result<Arc<KeyMgr>> {
        let key_store_dir = state_path.join("keystore");

        let persistent_store = ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, mistrust)
            .context("Failed to construct the native keystore")?;

        // Should only log fs paths at debug level or lower,
        // unless they're part of a diagnostic message.
        debug!("Using relay keystore from {key_store_dir:?}");

        let keymgr = KeyMgrBuilder::default()
            .primary_store(Box::new(persistent_store))
            .build()
            .context("Failed to build the 'KeyMgr'")?;
        let keymgr = Arc::new(keymgr);

        // TODO: support C-tor keystore

        Ok(keymgr)
    }
}

/// Represent an active Relay on the Tor network.
pub(crate) struct TorRelay<R: Runtime> {
    /// Asynchronous runtime object.
    runtime: R,

    /// Memory quota tracker.
    #[expect(unused)] // TODO RELAY remove
    memquota: Arc<MemoryQuotaTracker>,

    /// A "client" used by relays to construct circuits.
    client: RelayClient<R>,

    /// Channel manager, used by circuits etc.
    chanmgr: Arc<ChanMgr<R>>,

    /// Handles CREATE* requests on channels.
    ///
    /// Given to the [`ChanMgr`],
    /// which gives it to each channel.
    /// We can access this handler directly to update consensus parameters or keys.
    create_request_handler: Arc<CreateRequestHandler>,

    /// See [`InertTorRelay::keymgr`].
    keymgr: Arc<KeyMgr>,

    /// Listening OR ports.
    or_listeners: Vec<<R as NetStreamProvider<SocketAddr>>::Listener>,
}

impl<R: Runtime> TorRelay<R> {
    /// Create a new Tor relay with the given [`runtime`][tor_rtcompat].
    ///
    /// We use this to initialize components, open sockets, etc.
    /// Doing work with these components should happen in [`TorRelay::run()`].
    ///
    /// Expected to be called from [`InertTorRelay::init()`].
    async fn init(
        runtime: R,
        inert: InertTorRelay,
        auth_material: RelayChannelAuthMaterial,
    ) -> anyhow::Result<Self> {
        let memquota = MemoryQuotaTracker::new(&runtime, inert.config.system.memory.clone())
            .context("Failed to initialize memquota tracker")?;

        // Init the channel manager.
        let config = ChanMgrConfig::new(inert.config.channel.clone())
            .with_my_addrs(inert.config.relay.advertise.all_addr())
            .with_auth_material(Arc::new(auth_material));
        let chanmgr = Arc::new(
            ChanMgr::new(
                runtime.clone(),
                config,
                Dormancy::Active,
                // TODO: It seems wrong to start with the compiled-in defaults when we might have
                // a newer network status on disk that would provide a better initial value,
                // but `TorClient` does this too so let's not worry about it.
                &NetParameters::default(),
                memquota.clone(),
            )
            .context("Failed to build chan manager")?,
        );

        // Init the relay's client.
        let client = RelayClient::new(
            runtime.clone(),
            Arc::clone(&chanmgr),
            &inert.config,
            &inert.config,
            inert.dirmgr_config,
            inert.state_mgr,
        )
        .context("Failed to construct the relay's client")?;

        // Circuit-related network status parameters.
        let circ_net_params = build_circ_net_params(client.dirmgr().params().as_ref().as_ref())
            .context("Failed to build circuit parameters for CREATE* request handler")?;

        // A handler that will process CREATE* requests on channels.
        let ntor_keys = get_ntor_keys(&inert.keymgr)
            .context("Failed to get ntor keys for CREATE* request handler")?;
        let create_request_handler = CreateRequestHandler::new(
            Arc::downgrade(&chanmgr) as Weak<_>,
            circ_net_params,
            ntor_keys,
        );
        let create_request_handler = Arc::new(create_request_handler);

        // Configure the channel manager to handle CREATE* requests.
        //
        // We do this once, and can later update its network parameters and keys using the
        // `Arc` handle that we store.
        // The `ChanMgr` will hold an `Arc<CreateRequestHandler>` and
        // the `CreateRequestHandler` will hold a `Weak<ChanMgr>`.
        //
        // We could technically do something fancier by creating the `ChanMgr` and handler
        // inside an `Arc::new_cyclic()` and pass the handler as part of the `ChanMgrConfig`,
        // but the code becomes a mess.
        chanmgr
            .set_create_request_handler(Arc::clone(&create_request_handler))
            .context("Failed to set the CREATE* request handler")?;

        // An iterator of `listen()` futures with some extra error handling.
        let or_listeners = inert.config.relay.listen.addrs().map(async |addr| {
            match runtime.listen(addr).await {
                Ok(x) => Some(Ok(x)),
                // If we don't support the address family (typically IPv6), only warn.
                #[cfg(unix)]
                Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
                    let message =
                        format!("Could not listen at {addr}: address family not supported");
                    if addr.is_ipv6() {
                        warn!("{message}");
                    } else {
                        // If we got `EAFNOSUPPORT` for a non-IPv6 address, then warn louder.
                        tor_error::warn_report!(e, "{message}");
                    }
                    None
                }
                Err(e) => {
                    Some(Err(e).with_context(|| format!("Failed to listen at address {addr}")))
                }
            }
        });

        // We await the futures sequentially rather than with something like `join_all` to make
        // errors more reproducible.
        let or_listeners = {
            let mut awaited_listeners = vec![];
            for listener in or_listeners {
                match listener.await {
                    Some(Ok(x)) => awaited_listeners.push(x),
                    Some(Err(e)) => return Err(e),
                    None => {}
                };
            }
            awaited_listeners
        };

        // Typically we would have returned with an error if we failed to listen on an address,
        // but we ignore `EAFNOSUPPORT` errors above, so it's possible that all failed with
        // `EAFNOSUPPORT` and we ended up here.
        if or_listeners.is_empty() {
            return Err(anyhow::anyhow!(
                "Could not listen at any OR port addresses: {}",
                iter_join(", ", inert.config.relay.listen.addrs()),
            ));
        }

        Ok(Self {
            runtime,
            memquota,
            client,
            chanmgr,
            create_request_handler,
            keymgr: inert.keymgr,
            or_listeners,
        })
    }

    /// Run the actual relay.
    ///
    /// This only returns if something has gone wrong.
    /// Otherwise it runs forever.
    pub(crate) async fn run(self) -> anyhow::Result<void::Void> {
        let mut task_handles = JoinSet::new();

        // Channel housekeeping task.
        task_handles.spawn({
            let mut t = crate::tasks::ChannelHouseKeepingTask::new(&self.chanmgr);
            async move {
                t.start()
                    .await
                    .context("Failed to run channel house keeping task")
            }
        });

        // Update the CREATE* request handler when there are new network parameters.
        task_handles.spawn({
            let create_request_handler = Arc::clone(&self.create_request_handler);
            let dir_provider = Arc::clone(self.client.dirmgr());
            async {
                crate::tasks::channel::update_create_request_handler_netparams(
                    create_request_handler,
                    dir_provider as Arc<_>,
                )
                .await
                .context("Failed to run create request handler update task")
            }
        });

        // Listen for new Tor (OR) connections.
        task_handles.spawn({
            let runtime = self.runtime.clone();
            let chanmgr = Arc::clone(&self.chanmgr);
            async {
                // TODO: Should we give all tasks a `start` method?
                crate::tasks::listeners::or_listener(runtime, chanmgr, self.or_listeners)
                    .await
                    .context("Failed to run OR listener task")
            }
        });

        // Start the key rotation tasks.
        task_handles.spawn({
            let runtime = self.runtime.clone();
            let keymgr = self.keymgr.clone();
            let chanmgr = self.chanmgr.clone();
            let create_request_handler = Arc::clone(&self.create_request_handler);
            async {
                crate::tasks::crypto::rotate_keys_task(
                    runtime,
                    keymgr,
                    chanmgr,
                    create_request_handler,
                )
                .await
                .context("Failed to run key rotation task")
            }
        });

        // Launch client tasks.
        //
        // We need to hold on to these handles until the relay stops, otherwise dropping these
        // handles would stop the background tasks.
        //
        // These are `tor_rtcompat::scheduler::TaskHandle`s, which don't notify us if they
        // stop/crash.
        //
        // TODO: Whose responsibility is it to ensure that these background tasks don't crash?
        // Should we have a way of monitoring these tasks? Or should the circuit manager re-launch
        // crashed tasks?
        let _client_task_handles = self.client.launch_background_tasks();

        // TODO: More tasks will be spawned here.

        // Now that background tasks are started, bootstrap the client.
        self.client
            .bootstrap()
            .await
            .context("Failed to bootstrap the relay's client")?;

        // We block until facism is erradicated or a task ends which means the relay will shutdown
        // and facism will have one more chance.
        let void = task_handles
            .join_next()
            .await
            .context("Relay task set is empty")?
            .context("Relay task join failed")?
            .context("Relay task stopped unexpectedly")?;

        // We can never get here since a `Void` cannot be constructed.
        void::unreachable(void);
    }

    /// Access the relay's key manager.
    pub(crate) fn keymgr(&self) -> &Arc<KeyMgr> {
        &self.keymgr
    }
}