tor-proto 0.41.0

Asynchronous client-side implementation of the central Tor network protocols
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
//! A relay's view of the forward (away from the client, towards the exit) state of a circuit.

use crate::channel::{Channel, ChannelSender};
use crate::circuit::CircuitRxReceiver;
use crate::circuit::UniqId;
use crate::circuit::create::{Create2Wrap, CreateHandshakeWrap};
use crate::circuit::reactor::ControlHandler;
use crate::circuit::reactor::backward::BackwardReactorCmd;
use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
use crate::circuit::reactor::hop_mgr::HopMgr;
use crate::crypto::cell::OutboundRelayLayer;
use crate::crypto::cell::RelayCellBody;
use crate::relay::RelayCircChanMsg;
use crate::util::err::ReactorError;
use crate::{Error, HopNum, Result};

// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
use crate::client::circuit::padding::QueuedCellPaddingInfo;

use crate::relay::channel_provider::{ChannelProvider, ChannelResult, OutboundChanSender};
use crate::relay::reactor::CircuitAccount;
use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
use tor_cell::relaycell::msg::{Extend2, Extended2, SendmeTag};
use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
use tor_error::{internal, into_internal, warn_report};
use tor_linkspec::decode::Strictness;
use tor_linkspec::{OwnedChanTarget, OwnedChanTargetBuilder};
use tor_rtcompat::{Runtime, SpawnExt as _};

use futures::channel::mpsc;
use futures::{SinkExt as _, StreamExt as _, future};
use tracing::{debug, trace};

use std::result::Result as StdResult;
use std::sync::Arc;
use std::task::Poll;

/// Placeholder for our custom control message type.
type CtrlMsg = ();

/// Placeholder for our custom control command type.
type CtrlCmd = ();

/// The maximum number of RELAY_EARLY cells allowed on a circuit.
///
// TODO(relay): should we come up with a consensus parameter for this? (arti#2349)
const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;

/// Relay-specific state for the forward reactor.
pub(crate) struct Forward {
    /// An identifier for logging about this reactor's circuit.
    unique_id: UniqId,
    /// The outbound view of this circuit, if we are not the last hop.
    ///
    /// Delivers cells towards the exit.
    ///
    /// Only set for middle relays.
    outbound: Option<Outbound>,
    /// The cryptographic state for this circuit for inbound cells.
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
    /// A handle to a [`ChannelProvider`], used for initiating outgoing Tor channels.
    ///
    /// Note: all circuit reactors of a relay need to be initialized
    /// with the *same* underlying Tor channel provider (`ChanMgr`),
    /// to enable the reuse of existing Tor channels where possible.
    chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
    /// Whether we have received an EXTEND2 on this circuit.
    ///
    // TODO(relay): bools can be finicky.
    // Maybe we should combine this bool and the optional
    // outbound into a new state machine type
    // (with states Initial -> Extending -> Extended(Outbound))?
    // But should not do this if it turns out more convoluted than the bool-based approach.
    have_seen_extend2: bool,
    /// The number of RELAY_EARLY cells we have seen so far on this circuit.
    ///
    /// If we see more than [`MAX_RELAY_EARLY_CELLS_PER_CIRCUIT`] RELAY_EARLY cells, we tear down the circuit.
    relay_early_count: usize,
    /// A stream of events to be read from the main loop of the reactor.
    event_tx: mpsc::Sender<CircEvent>,
    /// Memory quota account
    memquota: CircuitAccount,
}

/// A type of event issued by the relay forward reactor.
pub(crate) enum CircEvent {
    /// The outcome of an EXTEND2 request.
    ExtendResult(StdResult<ExtendResult, ReactorError>),
}

/// A successful circuit extension result.
pub(crate) struct ExtendResult {
    /// The EXTENDED2 cell to send back to the client.
    extended2: Extended2,
    /// The outbound channel.
    outbound: Outbound,
    /// The reading end of the outbound Tor channel, if we are not the last hop.
    ///
    /// Yields cells moving from the exit towards the client, if we are a middle relay.
    outbound_chan_rx: CircuitRxReceiver,
}

/// The outbound view of a relay circuit.
struct Outbound {
    /// The circuit identifier on the outbound Tor channel.
    circ_id: CircId,
    /// The outbound Tor channel.
    channel: Arc<Channel>,
    /// The sending end of the outbound Tor channel.
    outbound_chan_tx: ChannelSender,
}

/// The outcome of `decode_relay_cell`.
enum CellDecodeResult {
    /// A decrypted cell.
    Recognized(SendmeTag, RelayCellDecoderResult),
    /// A cell we could not decrypt.
    Unrecognizd(RelayCellBody),
}

impl Forward {
    /// Create a new [`Forward`].
    pub(crate) fn new(
        unique_id: UniqId,
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
        event_tx: mpsc::Sender<CircEvent>,
        memquota: CircuitAccount,
    ) -> Self {
        Self {
            unique_id,
            // Initially, we are the last hop in the circuit.
            outbound: None,
            crypto_out,
            chan_provider,
            have_seen_extend2: false,
            relay_early_count: 0,
            event_tx,
            memquota,
        }
    }

    /// Decode `cell`, returning its corresponding hop number, tag and decoded body.
    fn decode_relay_cell<R: Runtime>(
        &mut self,
        hop_mgr: &mut HopMgr<R>,
        cell: Relay,
    ) -> Result<(Option<HopNum>, CellDecodeResult)> {
        // Note: the client reactor will return the actual source hopnum
        let hopnum = None;
        let cmd = cell.cmd();
        let mut body = cell.into_relay_body().into();
        let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
            return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
        };

        // The message is addressed to us! Now it's time to handle it...
        let mut hops = hop_mgr.hops().write().expect("poisoned lock");
        let decode_res = hops
            .get_mut(hopnum)
            .ok_or_else(|| internal!("msg from non-existent hop???"))?
            .inbound
            .decode(body.into())?;

        Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
    }

    /// Handle a DROP message.
    #[allow(clippy::unnecessary_wraps)] // Returns Err if circ-padding is enabled
    fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
        cfg_if::cfg_if! {
            if #[cfg(feature = "circ-padding")] {
                Err(internal!("relay circuit padding not yet supported").into())
            } else {
                Ok(())
            }
        }
    }

    /// Handle an EXTEND2 cell.
    ///
    /// This spawns a background task for dealing with the circuit extension,
    /// which then reports back the result via the [`Self::event_tx`] MPSC stream.
    /// Note that this MPSC stream is polled from the `ForwardReactor` main loop,
    /// and each `CircEvent` is passed back to [`Self::handle_event()`[ for handling.
    fn handle_extend2<R: Runtime>(
        &mut self,
        runtime: &R,
        early: bool,
        msg: UnparsedRelayMsg,
    ) -> StdResult<(), ReactorError> {
        // TODO(relay): this should be allowed if the AllowNonearlyExtend consensus
        // param is set (arti#2349)
        if !early {
            return Err(Error::CircProto("got EXTEND2 in a RELAY cell?!".into()).into());
        }

        // Check if we're in the right state before parsing the EXTEND2
        if self.have_seen_extend2 {
            return Err(Error::CircProto("got 2 EXTEND2 on the same circuit?!".into()).into());
        }

        self.have_seen_extend2 = true;

        let to_bytes_err = |e| Error::from_bytes_err(e, "EXTEND2 message");

        let extend2 = msg.decode::<Extend2>().map_err(to_bytes_err)?.into_msg();

        let chan_target = OwnedChanTargetBuilder::from_encoded_linkspecs(
            Strictness::Standard,
            extend2.linkspecs(),
        )
        .map_err(|err| Error::LinkspecDecodeErr {
            object: "EXTEND2",
            err,
        })?
        .build()
        .map_err(|_| {
            // TODO: should we include the error in the circ proto error context?
            Error::CircProto("Invalid channel target".into())
        })?;

        // Note: we don't do any further validation on the EXTEND2 here,
        // under the assumption it will be handled by the ChannelProvider.

        let (chan_tx, chan_rx) = mpsc::unbounded();

        let chan_tx = OutboundChanSender(chan_tx);
        Arc::clone(&self.chan_provider).get_or_launch(self.unique_id, chan_target, chan_tx)?;

        let mut result_tx = self.event_tx.clone();
        let rt = runtime.clone();
        let unique_id = self.unique_id;
        let memquota = self.memquota.clone();

        // TODO(relay): because we dispatch this the entire EXTEND2 handling to a background task,
        // we don't really need the channel provider to send us the outcome via an MPSC channel,
        // because get_or_launch() could simply be async (it wouldn't block the reactor,
        // because it runs in another task). Maybe we need to rethink the ChannelProvider API?
        runtime
            .spawn(async move {
                let res = Self::extend_circuit(rt, unique_id, extend2, chan_rx, memquota).await;

                // Discard the error if the reactor shut down before we had
                // a chance to complete the extend handshake
                let _ = result_tx.send(CircEvent::ExtendResult(res)).await;
            })
            .map_err(into_internal!("failed to spawn extend task?!"))?;

        Ok(())
    }

    /// Handle the outcome of handling an EXTEND2.
    fn handle_extend_result(
        &mut self,
        res: StdResult<ExtendResult, ReactorError>,
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
        let ExtendResult {
            extended2,
            outbound,
            outbound_chan_rx,
        } = res?;

        self.outbound = Some(outbound);

        Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
            hop: None,
            extended2,
            outbound_chan_rx,
        }))
    }

    /// Extend this circuit on the channel received on `chan_rx`.
    ///
    /// Note: this gets spawned in a background task from
    /// [`Self::handle_extend2`] so as not to block the reactor main loop.
    ///
    #[allow(unused_variables)] // will become used once we implement CREATED2 timeouts
    async fn extend_circuit<R: Runtime>(
        _runtime: R,
        unique_id: UniqId,
        extend2: Extend2,
        mut chan_rx: mpsc::UnboundedReceiver<ChannelResult>,
        memquota: CircuitAccount,
    ) -> StdResult<ExtendResult, ReactorError> {
        // We expect the channel build timeout to be enforced by the ChannelProvider
        let chan_res = chan_rx
            .next()
            .await
            .ok_or_else(|| internal!("channel provider task exited"))?;

        let channel = match chan_res {
            Ok(c) => c,
            Err(e) => {
                warn_report!(e, "Failed to launch outgoing channel");
                // Note: retries are handled within
                // get_or_launch(), so if we receive an
                // error at this point, we need to bail
                return Err(ReactorError::Shutdown);
            }
        };

        debug!(
            circ_id = %unique_id,
            "Launched channel to the next hop"
        );

        // Now that we finally have a forward Tor channel,
        // it's time to forward the onion skin and extend the circuit...
        //
        // Note: the only reason we need to await here is because internally
        // new_outbound_circ() sends a control message to the channel reactor handles,
        // which is handled asynchronously. In practice, we're not actually waiting on
        // the network here, so in theory we shouldn't need a timeout for this operation.
        let (circ_id, outbound_chan_rx, createdreceiver) =
            channel.new_outbound_circ(memquota).await?;

        // We have allocated a circuit in the channel's circmap,
        // now it's time to send the CREATE2 and wait for the response.
        let create2_wrap = Create2Wrap {
            handshake_type: extend2.handshake_type(),
        };
        let create2 = create2_wrap.to_chanmsg(extend2.handshake().into());

        // Time to write the CREATE2 to the outbound channel...
        let mut outbound_chan_tx = channel.sender();
        let cell = AnyChanCell::new(Some(circ_id), create2);

        trace!(
            circ_id = %unique_id,
            "Sending CREATE2 to the next hop"
        );

        outbound_chan_tx.send((cell, None)).await?;

        // TODO(relay): we need a timeout here, otherwise we might end up waiting forever
        // for the CREATED2 to arrive.
        //
        // There is some complexity here, see
        // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3648#note_3340125
        let response = createdreceiver
            .await
            .map_err(|_| internal!("channel disappeared?"))?;

        trace!(
            circ_id = %unique_id,
            "Got CREATED2 response from next hop"
        );

        let outbound = Outbound {
            circ_id,
            channel: Arc::clone(&channel),
            outbound_chan_tx,
        };

        // If we reach this point, it means we have extended
        // the circuit by one hop, so we need to take the contents
        // of the CREATE/CREATED2 cell, and package an EXTEND/EXTENDED2
        // to send back to the client.
        let created2_body = create2_wrap.decode_chanmsg(response)?;
        let extended2 = Extended2::new(created2_body);

        Ok(ExtendResult {
            extended2,
            outbound,
            outbound_chan_rx,
        })
    }

    /// Handle a RELAY or RELAY_EARLY cell.
    fn handle_relay_cell<R: Runtime>(
        &mut self,
        hop_mgr: &mut HopMgr<R>,
        cell: Relay,
        early: bool,
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
        if early {
            self.relay_early_count += 1;

            if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
                return Err(
                    Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
                );
            }
        }

        let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
        let (tag, decode_res) = match res {
            CellDecodeResult::Unrecognizd(body) => {
                self.handle_unrecognized_cell(body, None, early)?;
                return Ok(None);
            }
            CellDecodeResult::Recognized(tag, res) => (tag, res),
        };

        Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
            cell: decode_res,
            early,
            hopnum,
            tag,
        }))
    }

    /// Handle a forward cell that we could not decrypt.
    fn handle_unrecognized_cell(
        &mut self,
        body: RelayCellBody,
        info: Option<QueuedCellPaddingInfo>,
        early: bool,
    ) -> StdResult<(), ReactorError> {
        // TODO(relay): remove this log once we add some tests
        // and confirm relaying cells works as expected
        // (in practice it will be too noisy to be useful, even at trace level).
        trace!(
            circ_id = %self.unique_id,
            "Forwarding unrecognized cell"
        );

        let Some(chan) = self.outbound.as_mut() else {
            // The client shouldn't try to send us any cells before it gets
            // an EXTENDED2 cell from us
            return Err(Error::CircProto(
                "Asked to forward cell before the circuit was extended?!".into(),
            )
            .into());
        };

        let msg = Relay::from(BoxedCellBody::from(body));
        let relay = if early {
            AnyChanMsg::RelayEarly(msg.into())
        } else {
            AnyChanMsg::Relay(msg)
        };
        let cell = AnyChanCell::new(Some(chan.circ_id), relay);

        // Note: this future is always `Ready`, because we checked the sink for readiness
        // before polling the input channel, so await won't block.
        chan.outbound_chan_tx.start_send_unpin((cell, info))?;

        Ok(())
    }

    /// Handle a TRUNCATE cell.
    #[allow(clippy::unused_async)] // TODO(relay)
    async fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
        // TODO(relay): when we implement this, we should try to do better than C Tor:
        // if we have some cells queued for the next hop in the circuit,
        // we should try to flush them *before* tearing it down.
        //
        // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3487#note_3296035
        Err(internal!("TRUNCATE is not implemented").into())
    }

    /// Handle a DESTROY cell originating from the client.
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
    fn handle_destroy_cell(&mut self, _cell: Destroy) -> StdResult<(), ReactorError> {
        Err(internal!("DESTROY is not implemented").into())
    }

    /// Handle a PADDING_NEGOTIATE cell originating from the client.
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
    fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
        Err(internal!("PADDING_NEGOTIATE is not implemented").into())
    }
}

impl ForwardHandler for Forward {
    type BuildSpec = OwnedChanTarget;
    type CircChanMsg = RelayCircChanMsg;
    type CircEvent = CircEvent;

    async fn handle_meta_msg<R: Runtime>(
        &mut self,
        runtime: &R,
        early: bool,
        _hopnum: Option<HopNum>,
        msg: UnparsedRelayMsg,
        _relay_cell_format: RelayCellFormat,
    ) -> StdResult<(), ReactorError> {
        match msg.cmd() {
            RelayCmd::DROP => self.handle_drop(),
            RelayCmd::EXTEND2 => self.handle_extend2(runtime, early, msg),
            RelayCmd::TRUNCATE => self.handle_truncate().await,
            cmd => Err(internal!("relay cmd {cmd} not supported").into()),
        }
    }

    async fn handle_forward_cell<R: Runtime>(
        &mut self,
        hop_mgr: &mut HopMgr<R>,
        cell: RelayCircChanMsg,
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
        use RelayCircChanMsg::*;

        match cell {
            Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
            RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
            Destroy(d) => {
                self.handle_destroy_cell(d)?;
                Ok(None)
            }
            PaddingNegotiate(p) => {
                self.handle_padding_negotiate(p)?;
                Ok(None)
            }
        }
    }

    fn handle_event(
        &mut self,
        event: Self::CircEvent,
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
        match event {
            CircEvent::ExtendResult(res) => self.handle_extend_result(res),
        }
    }

    async fn outbound_chan_ready(&mut self) -> Result<()> {
        future::poll_fn(|cx| match &mut self.outbound {
            Some(chan) => {
                let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);

                chan.outbound_chan_tx.poll_ready_unpin(cx)
            }
            None => {
                // Pedantically, if the channel doesn't exist, it can't be ready,
                // but we have no choice here than to return Ready
                // (returning Pending would cause the reactor to lock up).
                //
                // Returning ready here means the base reactor is allowed to read
                // from its inbound channel. This is OK, because if we *do*
                // read a cell from that channel and find ourselves needing to
                // forward it to the next hop, we simply return a proto violation error,
                // shutting down the reactor.
                Poll::Ready(Ok(()))
            }
        })
        .await
    }
}

impl ControlHandler for Forward {
    type CtrlMsg = CtrlMsg;
    type CtrlCmd = CtrlCmd;

    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
        let () = cmd;
        Ok(())
    }

    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
        let () = msg;
        Ok(())
    }
}

impl Drop for Forward {
    fn drop(&mut self) {
        if let Some(outbound) = self.outbound.as_mut() {
            // This will send a DESTROY down the outbound channel
            let _ = outbound.channel.close_circuit(outbound.circ_id);
        }
    }
}