Skip to main content

tor_proto/stream/
incoming.rs

1//! Incoming data stream cell handlers, shared by the relay and onion service implementations.
2
3use bitvec::prelude::*;
4use derive_deftly::Deftly;
5use oneshot_fused_workaround as oneshot;
6
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg, msg};
8use tor_cell::restricted_msg;
9use tor_error::internal;
10use tor_memquota::derive_deftly_template_HasMemoryCost;
11use tor_memquota::mq_queue::{self, MpscSpec};
12use tor_rtcompat::DynTimeProvider;
13
14use crate::circuit::CircHopSyncView;
15use crate::circuit::circhop::ReactorStreamComponents;
16use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
17use crate::stream::{CloseStreamBehavior, StreamComponents};
18use crate::{Error, Result};
19
20// TODO(relay): move these to a shared module
21use crate::client::stream::DataStream;
22
23use crate::memquota::StreamAccount;
24use crate::{HopLocation, HopNum};
25
26/// A `CmdChecker` that enforces invariants for inbound data streams.
27#[derive(Debug, Default)]
28pub(crate) struct InboundDataCmdChecker;
29
30restricted_msg! {
31    /// An allowable incoming message on an incoming data stream.
32    enum IncomingDataStreamMsg:RelayMsg {
33        // SENDME is handled by the reactor.
34        Data, End,
35    }
36}
37
38impl CmdChecker for InboundDataCmdChecker {
39    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
40        use StreamStatus::*;
41        match msg.cmd() {
42            RelayCmd::DATA => Ok(Open),
43            RelayCmd::END => Ok(Closed),
44            _ => Err(Error::StreamProto(format!(
45                "Unexpected {} on an incoming data stream!",
46                msg.cmd()
47            ))),
48        }
49    }
50
51    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
52        let _ = msg
53            .decode::<IncomingDataStreamMsg>()
54            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
55        Ok(())
56    }
57}
58
59impl InboundDataCmdChecker {
60    /// Return a new boxed `DataCmdChecker` in a state suitable for a
61    /// connection where an initial CONNECTED cell is not expected.
62    ///
63    /// This is used by hidden services, exit relays, and directory servers
64    /// to accept streams.
65    pub(crate) fn new_connected() -> AnyCmdChecker {
66        Box::new(Self)
67    }
68}
69
70/// A pending request from the other end of the circuit for us to open a new
71/// stream.
72///
73/// Exits, directory caches, and onion services expect to receive these; others
74/// do not.
75///
76/// On receiving one of these objects, the party handling it should accept it or
77/// reject it.  If it is dropped without being explicitly handled, a reject
78/// message will be sent anyway.
79#[derive(Debug)]
80pub struct IncomingStream {
81    /// The runtime's time provider.
82    time_provider: DynTimeProvider,
83    /// The message that the client sent us to begin the stream.
84    request: IncomingStreamRequest,
85    /// Stream components used to assemble the [`DataStream`].
86    components: StreamComponents,
87}
88
89impl IncomingStream {
90    /// Create a new `IncomingStream`.
91    pub(crate) fn new(
92        time_provider: DynTimeProvider,
93        request: IncomingStreamRequest,
94        components: StreamComponents,
95    ) -> Self {
96        Self {
97            time_provider,
98            request,
99            components,
100        }
101    }
102
103    /// Return the underlying message that was used to try to begin this stream.
104    pub fn request(&self) -> &IncomingStreamRequest {
105        &self.request
106    }
107
108    /// Accept this stream as a new [`DataStream`], and send the client a
109    /// message letting them know the stream was accepted.
110    pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
111        let Self {
112            time_provider,
113            request,
114            components:
115                StreamComponents {
116                    mut target,
117                    stream_receiver,
118                    xon_xoff_reader_ctrl,
119                    memquota,
120                },
121        } = self;
122
123        match request {
124            IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
125                target.send(message.into()).await?;
126                Ok(DataStream::new_connected(
127                    time_provider,
128                    stream_receiver,
129                    xon_xoff_reader_ctrl,
130                    target,
131                    memquota,
132                ))
133            }
134            IncomingStreamRequest::Resolve(_) => {
135                Err(internal!("Cannot accept data on a RESOLVE stream").into())
136            }
137        }
138    }
139
140    /// Reject this request and send an error message to the client.
141    pub async fn reject(mut self, message: msg::End) -> Result<()> {
142        let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
143
144        rx.await.map_err(|_| Error::CircuitClosed)?
145    }
146
147    /// Reject this request and possibly send an error message to the client.
148    ///
149    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
150    fn reject_inner(
151        &mut self,
152        message: CloseStreamBehavior,
153    ) -> Result<oneshot::Receiver<Result<()>>> {
154        self.components.target.close_pending(message)
155    }
156
157    /// Ignore this request without replying to the client.
158    ///
159    /// (If you drop an [`IncomingStream`] without calling `accept_data`,
160    /// `reject`, or this method, the drop handler will cause it to be
161    /// rejected.)
162    pub async fn discard(mut self) -> Result<()> {
163        let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
164
165        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
166    }
167}
168
169// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
170// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
171// circuit reactor will see a close on its mpsc::Receiver, and the circuit
172// reactor will itself send an End.
173
174restricted_msg! {
175    /// The allowed incoming messages on an `IncomingStream`.
176    #[derive(Clone, Debug, Deftly)]
177    #[derive_deftly(HasMemoryCost)]
178    #[non_exhaustive]
179    pub enum IncomingStreamRequest: RelayMsg {
180        /// A BEGIN message.
181        Begin,
182        /// A BEGIN_DIR message.
183        BeginDir,
184        /// A RESOLVE message.
185        Resolve,
186    }
187}
188
189/// Bit-vector used to represent a list of permitted commands.
190///
191/// This is cheaper and faster than using a vec, and avoids side-channel
192/// attacks.
193type RelayCmdSet = bitvec::BitArr!(for 256);
194
195/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
196/// have a non-zero stream ID.
197#[derive(Debug)]
198pub(crate) struct IncomingCmdChecker {
199    /// The "begin" commands that can be received on this type of circuit:
200    ///
201    ///   * onion service circuits only accept `BEGIN`
202    ///   * all relay circuits accept `BEGIN_DIR`
203    ///   * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
204    ///   * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
205    ///     as well
206    allow_commands: RelayCmdSet,
207}
208
209impl IncomingCmdChecker {
210    /// Create a new boxed `IncomingCmdChecker`.
211    pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
212        let mut array = BitArray::ZERO;
213        for c in allow_commands {
214            array.set(u8::from(*c) as usize, true);
215        }
216        Box::new(Self {
217            allow_commands: array,
218        })
219    }
220}
221
222impl CmdChecker for IncomingCmdChecker {
223    fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
224        if self.allow_commands[u8::from(msg.cmd()) as usize] {
225            Ok(StreamStatus::Open)
226        } else {
227            Err(Error::StreamProto(format!(
228                "Unexpected {} on incoming stream",
229                msg.cmd()
230            )))
231        }
232    }
233
234    fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
235        let _ = msg
236            .decode::<IncomingStreamRequest>()
237            .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
238
239        Ok(())
240    }
241}
242
243/// A callback that can check whether a given stream request is acceptable
244/// immediately on its receipt.
245///
246/// This should only be used for checks that need to be done immediately, with a
247/// view of the state of the circuit hop the stream request arrived on.
248/// Any other checks should, if possible,
249/// be done on the [`IncomingStream`] objects as they are received.
250pub trait IncomingStreamRequestFilter: Send + 'static {
251    /// Check an incoming stream request, and decide what to do with it.
252    fn disposition(
253        &mut self,
254        ctx: &IncomingStreamRequestContext<'_>,
255        circ: &CircHopSyncView<'_>,
256    ) -> Result<IncomingStreamRequestDisposition>;
257}
258
259/// What action to take with an incoming stream request.
260#[derive(Clone, Debug)]
261#[non_exhaustive]
262pub enum IncomingStreamRequestDisposition {
263    /// Accept the request (for now) and pass it to the mpsc::Receiver
264    /// that is yielding them as [`IncomingStream``
265    Accept,
266    /// Rejected the request, and close the circuit on which it was received.
267    CloseCircuit,
268    /// Reject the request and send an END message.
269    RejectRequest(msg::End),
270}
271
272/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
273pub struct IncomingStreamRequestContext<'a> {
274    /// The request message itself
275    pub(crate) request: &'a IncomingStreamRequest,
276}
277impl<'a> IncomingStreamRequestContext<'a> {
278    /// Return a reference to the message used to request this stream.
279    pub fn request(&self) -> &'a IncomingStreamRequest {
280        self.request
281    }
282}
283
284/// Information about an incoming stream request.
285#[derive(Debug, Deftly)]
286#[derive_deftly(HasMemoryCost)]
287pub(crate) struct StreamReqInfo {
288    /// The [`IncomingStreamRequest`].
289    pub(crate) req: IncomingStreamRequest,
290    /// The ID of the stream being requested.
291    pub(crate) stream_id: StreamId,
292    /// The [`HopNum`].
293    ///
294    /// Set to `None` if we are an exit relay.
295    //
296    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
297    // incoming stream request from two separate hops.  (There is only one that's valid.)
298    pub(crate) hop: Option<HopLocation>,
299    /// The format which must be used with this stream to encode messages.
300    #[deftly(has_memory_cost(indirect_size = "0"))]
301    pub(crate) relay_cell_format: RelayCellFormat,
302    /// A collection of queues/channels that can be used to interact with this stream.
303    pub(crate) stream_components: ReactorStreamComponents,
304    /// The memory quota account to be used for this stream
305    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
306    pub(crate) memquota: StreamAccount,
307}
308
309/// MPSC queue containing stream requests
310#[cfg(any(feature = "hs-service", feature = "relay"))]
311pub(crate) type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
312
313/// Data required for handling an incoming stream request.
314#[derive(educe::Educe)]
315#[educe(Debug)]
316#[cfg(any(feature = "hs-service", feature = "relay"))]
317pub(crate) struct IncomingStreamRequestHandler {
318    /// A sender for sharing information about an incoming stream request.
319    pub(crate) incoming_sender: StreamReqSender,
320    /// The hop to expect incoming stream requests from.
321    ///
322    /// Set to `None` if we are a relay.
323    pub(crate) hop_num: Option<HopNum>,
324    /// A [`CmdChecker`] for validating incoming streams.
325    pub(crate) cmd_checker: AnyCmdChecker,
326    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
327    /// this request, or wants to reject it immediately.
328    #[educe(Debug(ignore))]
329    pub(crate) filter: Box<dyn IncomingStreamRequestFilter>,
330}
331
332#[cfg(test)]
333mod test {
334    // @@ begin test lint list maintained by maint/add_warning @@
335    #![allow(clippy::bool_assert_comparison)]
336    #![allow(clippy::clone_on_copy)]
337    #![allow(clippy::dbg_macro)]
338    #![allow(clippy::mixed_attributes_style)]
339    #![allow(clippy::print_stderr)]
340    #![allow(clippy::print_stdout)]
341    #![allow(clippy::single_char_pattern)]
342    #![allow(clippy::unwrap_used)]
343    #![allow(clippy::unchecked_time_subtraction)]
344    #![allow(clippy::useless_vec)]
345    #![allow(clippy::needless_pass_by_value)]
346    #![allow(clippy::string_slice)] // See arti#2571
347    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
348
349    use tor_cell::relaycell::{
350        AnyRelayMsgOuter, RelayCellFormat,
351        msg::{Begin, BeginDir, Data, Resolve},
352    };
353
354    use super::*;
355
356    #[test]
357    fn incoming_cmd_checker() {
358        // Convert an AnyRelayMsg to an UnparsedRelayCell.
359        let u = |msg| {
360            let body = AnyRelayMsgOuter::new(None, msg)
361                .encode(RelayCellFormat::V0, &mut rand::rng())
362                .unwrap();
363            UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
364        };
365        let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
366        let begin_dir = u(BeginDir::default().into());
367        let resolve = u(Resolve::new("allium.example.com").into());
368        let data = u(Data::new(&[1, 2, 3]).unwrap().into());
369
370        {
371            let mut cc_none = IncomingCmdChecker::new_any(&[]);
372            for m in [&begin, &begin_dir, &resolve, &data] {
373                assert!(cc_none.check_msg(m).is_err());
374            }
375        }
376
377        {
378            let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
379            assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
380            for m in [&begin_dir, &resolve, &data] {
381                assert!(cc_begin.check_msg(m).is_err());
382            }
383        }
384
385        {
386            let mut cc_any = IncomingCmdChecker::new_any(&[
387                RelayCmd::BEGIN,
388                RelayCmd::BEGIN_DIR,
389                RelayCmd::RESOLVE,
390            ]);
391            for m in [&begin, &begin_dir, &resolve] {
392                assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
393            }
394            assert!(cc_any.check_msg(&data).is_err());
395        }
396    }
397}