Skip to main content

tor_proto/client/stream/
data.rs

1//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::stream::StreamExt;
11use futures::task::{Context, Poll};
12use futures::{Future, Stream};
13use pin_project::pin_project;
14use postage::watch;
15
16#[cfg(feature = "tokio")]
17use tokio_crate::io::ReadBuf;
18#[cfg(feature = "tokio")]
19use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20#[cfg(feature = "tokio")]
21use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22use tor_cell::restricted_msg;
23
24use std::fmt::Debug;
25use std::io::Result as IoResult;
26use std::num::NonZero;
27use std::pin::Pin;
28#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29use std::sync::Arc;
30#[cfg(feature = "stream-ctrl")]
31use std::sync::{Mutex, Weak};
32
33use educe::Educe;
34
35use crate::client::ClientTunnel;
36use crate::client::stream::StreamReceiver;
37use crate::memquota::StreamAccount;
38use crate::stream::StreamTarget;
39use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
40use crate::stream::flow_ctrl::state::StreamRateLimit;
41use crate::stream::flow_ctrl::xon_xoff::reader::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
42use crate::util::token_bucket::dynamic_writer::DynamicRateLimitedWriter;
43use crate::util::token_bucket::writer::{RateLimitedWriter, RateLimitedWriterConfig};
44use tor_basic_utils::skip_fmt;
45use tor_cell::relaycell::msg::Data;
46use tor_error::internal;
47use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
48
49/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
50///
51/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
52/// [`DataWriter`], which leaves us with this ugly type.
53/// We use a type alias to make `DataWriter` a little nicer.
54type RateConfigStream = futures::stream::Map<
55    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
56    fn(StreamRateLimit) -> RateLimitedWriterConfig,
57>;
58
59/// An anonymized stream over the Tor network.
60///
61/// For most purposes, you can think of this type as an anonymized
62/// TCP stream: it can read and write data, and get closed when it's done.
63///
64/// [`DataStream`] implements [`futures::io::AsyncRead`] and
65/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
66/// traits are expected.
67///
68/// # Examples
69///
70/// Connecting to an HTTP server and sending a request, using
71/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
72///
73/// ```ignore
74/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
75///
76/// use futures::io::AsyncWriteExt;
77///
78/// stream
79///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
80///     .await?;
81///
82/// // Flushing the stream is important; see below!
83/// stream.flush().await?;
84/// ```
85///
86/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
87///
88/// ```ignore
89/// use futures::io::AsyncReadExt;
90///
91/// let mut buf = Vec::new();
92/// stream.read_to_end(&mut buf).await?;
93///
94/// println!("{}", String::from_utf8_lossy(&buf));
95/// ```
96///
97/// # Usage with Tokio
98///
99/// If the `tokio` crate feature is enabled, this type also implements
100/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
101/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
102/// with code that expects those traits.
103///
104/// # Remember to call `flush`!
105///
106/// DataStream buffers data internally, in order to write as few cells
107/// as possible onto the network.  In order to make sure that your
108/// data has actually been sent, you need to make sure that
109/// [`AsyncWrite::poll_flush`] runs to completion: probably via
110/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
111///
112/// # Splitting the type
113///
114/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
115/// `DataStream::split` method can be used to split it into those two parts, for more
116/// convenient usage with e.g. stream combinators.
117///
118/// # How long does a stream live?
119///
120/// A `DataStream` will live until all references to it are dropped,
121/// or until it is closed explicitly.
122///
123/// If you split the stream into a `DataReader` and a `DataWriter`, it
124/// will survive until _both_ are dropped, or until it is closed
125/// explicitly.
126///
127/// A stream can also close because of a network error,
128/// or because the other side of the stream decided to close it.
129///
130// # Semver note
131//
132// Note that this type is re-exported as a part of the public API of
133// the `arti-client` crate.  Any changes to its API here in
134// `tor-proto` need to be reflected above.
135#[derive(Debug)]
136pub struct DataStream {
137    /// Underlying writer for this stream
138    w: DataWriter,
139    /// Underlying reader for this stream
140    r: DataReader,
141    /// A control object that can be used to monitor and control this stream
142    /// without needing to own it.
143    #[cfg(feature = "stream-ctrl")]
144    ctrl: Arc<ClientDataStreamCtrl>,
145}
146assert_impl_all! { DataStream: Send, Sync }
147
148/// An object used to control and monitor a data stream.
149///
150/// # Notes
151///
152/// This is a separate type from [`DataStream`] because it's useful to have
153/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
154/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
155/// work correctly.
156#[cfg(feature = "stream-ctrl")]
157#[derive(Debug)]
158pub struct ClientDataStreamCtrl {
159    /// The circuit to which this stream is attached.
160    ///
161    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
162    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
163    /// one of those is alive, this reference will be present.
164    ///
165    /// We make this a Weak reference so that once the stream itself is closed,
166    /// we can't leak circuits.
167    tunnel: Weak<ClientTunnel>,
168
169    /// Shared user-visible information about the state of this stream.
170    ///
171    /// TODO RPC: This will probably want to be a `postage::Watch` or something
172    /// similar, if and when it stops moving around.
173    #[cfg(feature = "stream-ctrl")]
174    status: Arc<Mutex<DataStreamStatus>>,
175
176    /// The memory quota account that should be used for this stream's data
177    ///
178    /// Exists to keep the account alive
179    _memquota: StreamAccount,
180}
181
182/// The inner writer for [`DataWriter`].
183///
184/// This type is responsible for taking bytes and packaging them into cells.
185/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
186#[derive(Debug)]
187struct DataWriterInner {
188    /// Internal state for this writer
189    ///
190    /// This is stored in an Option so that we can mutate it in the
191    /// AsyncWrite functions.  It might be possible to do better here,
192    /// and we should refactor if so.
193    state: Option<DataWriterState>,
194
195    /// The memory quota account that should be used for this stream's data
196    ///
197    /// Exists to keep the account alive
198    // If we liked, we could make this conditional; see DataReaderInner.memquota
199    _memquota: StreamAccount,
200
201    /// A control object that can be used to monitor and control this stream
202    /// without needing to own it.
203    #[cfg(feature = "stream-ctrl")]
204    ctrl: Arc<ClientDataStreamCtrl>,
205}
206
207/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
208///
209/// See the [`DataStream`] docs for more information. In particular, note
210/// that this writer requires `poll_flush` to complete in order to guarantee that
211/// all data has been written.
212///
213/// # Usage with Tokio
214///
215/// If the `tokio` crate feature is enabled, this type also implements
216/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
217/// with code that expects that trait.
218///
219/// # Drop and close
220///
221/// Note that dropping a `DataWriter` has no special effect on its own:
222/// if the `DataWriter` is dropped, the underlying stream will still remain open
223/// until the `DataReader` is also dropped.
224///
225/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
226/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
227///
228/// Remember that Tor does not support half-open streams:
229/// If you `close` or `shutdown` a stream,
230/// the other side will not see the stream as half-open,
231/// and so will (probably) not finish sending you any in-progress data.
232/// Do not use `close`/`shutdown` to communicate anything besides
233/// "I am done using this stream."
234///
235// # Semver note
236//
237// Note that this type is re-exported as a part of the public API of
238// the `arti-client` crate.  Any changes to its API here in
239// `tor-proto` need to be reflected above.
240#[derive(Debug)]
241pub struct DataWriter {
242    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
243    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
244}
245
246impl DataWriter {
247    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
248    fn new(
249        inner: DataWriterInner,
250        rate_limit_updates: watch::Receiver<StreamRateLimit>,
251        time_provider: DynTimeProvider,
252    ) -> Self {
253        /// Converts a `rate` into a `RateLimitedWriterConfig`.
254        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
255            let rate = rate.bytes_per_sec();
256            RateLimitedWriterConfig {
257                rate,        // bytes per second
258                burst: rate, // bytes
259                // This number is chosen arbitrarily, but the idea is that we want to balance
260                // between throughput and latency. Assume the user tries to write a large buffer
261                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
262                // frequently and writing a small number of bytes each time to the
263                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
264                // too large (for example 510), we'll be waking up infrequently to write a larger
265                // number of bytes each time. So even if the `DataWriterInner` has almost a full
266                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
267                // bytes before a cell can be sent, it will block until the rate limiter allows 510
268                // more bytes.
269                //
270                // TODO(arti#2028): Is there an optimal value here?
271                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
272            }
273        }
274
275        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
276        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
277
278        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
279        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
280
281        // build the rate limiter
282        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
283        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
284
285        Self { writer }
286    }
287
288    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
289    /// interact with this stream without holding the stream itself.
290    #[cfg(feature = "stream-ctrl")]
291    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
292        Some(self.writer.inner().client_stream_ctrl())
293    }
294}
295
296impl AsyncWrite for DataWriter {
297    fn poll_write(
298        mut self: Pin<&mut Self>,
299        cx: &mut Context<'_>,
300        buf: &[u8],
301    ) -> Poll<IoResult<usize>> {
302        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
303    }
304
305    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
306        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
307    }
308
309    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
310        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
311    }
312}
313
314#[cfg(feature = "tokio")]
315impl TokioAsyncWrite for DataWriter {
316    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
317        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
318    }
319
320    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
321        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
322    }
323
324    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
325        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
326    }
327}
328
329/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
330///
331/// See the [`DataStream`] docs for more information.
332///
333/// # Usage with Tokio
334///
335/// If the `tokio` crate feature is enabled, this type also implements
336/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
337/// with code that expects that trait.
338//
339// # Semver note
340//
341// Note that this type is re-exported as a part of the public API of
342// the `arti-client` crate.  Any changes to its API here in
343// `tor-proto` need to be reflected above.
344#[derive(Debug)]
345pub struct DataReader {
346    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
347    reader: XonXoffReader<DataReaderInner>,
348}
349
350impl DataReader {
351    /// Create a new [`DataReader`].
352    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
353        Self {
354            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
355        }
356    }
357
358    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
359    /// interact with this stream without holding the stream itself.
360    #[cfg(feature = "stream-ctrl")]
361    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
362        Some(self.reader.inner().client_stream_ctrl())
363    }
364}
365
366impl AsyncRead for DataReader {
367    fn poll_read(
368        mut self: Pin<&mut Self>,
369        cx: &mut Context<'_>,
370        buf: &mut [u8],
371    ) -> Poll<IoResult<usize>> {
372        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
373    }
374
375    fn poll_read_vectored(
376        mut self: Pin<&mut Self>,
377        cx: &mut Context<'_>,
378        bufs: &mut [std::io::IoSliceMut<'_>],
379    ) -> Poll<IoResult<usize>> {
380        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
381    }
382}
383
384#[cfg(feature = "tokio")]
385impl TokioAsyncRead for DataReader {
386    fn poll_read(
387        self: Pin<&mut Self>,
388        cx: &mut Context<'_>,
389        buf: &mut ReadBuf<'_>,
390    ) -> Poll<IoResult<()>> {
391        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
392    }
393}
394
395/// The inner reader for [`DataReader`].
396///
397/// This type is responsible for taking stream messages and extracting the stream data from them.
398/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
399#[derive(Debug)]
400pub(crate) struct DataReaderInner {
401    /// Internal state for this reader.
402    ///
403    /// This is stored in an Option so that we can mutate it in
404    /// poll_read().  It might be possible to do better here, and we
405    /// should refactor if so.
406    state: Option<DataReaderState>,
407
408    /// The memory quota account that should be used for this stream's data
409    ///
410    /// Exists to keep the account alive
411    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
412    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
413    _memquota: StreamAccount,
414
415    /// A control object that can be used to monitor and control this stream
416    /// without needing to own it.
417    #[cfg(feature = "stream-ctrl")]
418    ctrl: Arc<ClientDataStreamCtrl>,
419}
420
421impl BufferIsEmpty for DataReaderInner {
422    /// The result will become stale,
423    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
424    fn is_empty(mut self: Pin<&mut Self>) -> bool {
425        match self
426            .state
427            .as_mut()
428            .expect("forgot to put `DataReaderState` back")
429        {
430            DataReaderState::Open(imp) => {
431                // check if the partial cell in `pending` is empty,
432                // and if the message stream is empty
433                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
434            }
435            // closed, so any data should have been discarded
436            DataReaderState::Closed => true,
437        }
438    }
439}
440
441/// Shared status flags for tracking the status of as `DataStream`.
442///
443/// We expect to refactor this a bit, so it's not exposed at all.
444//
445// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
446// from various points in this module, we should instead construct
447// DataStreamStatus as needed from information available elsewhere.  In any
448// case, we should really  eliminate as much duplicate state here as we can.
449// (See discussions at !1198 for some challenges with this.)
450#[cfg(feature = "stream-ctrl")]
451#[derive(Clone, Debug, Default)]
452struct DataStreamStatus {
453    /// True if we've received a CONNECTED message.
454    //
455    // TODO: This is redundant with `connected` in DataReaderImpl.
456    received_connected: bool,
457    /// True if we have decided to send an END message.
458    //
459    // TODO RPC: There is not an easy way to set this from this module!  Really,
460    // the decision to send an "end" is made when the StreamTarget object is
461    // dropped, but we don't currently have any way to see when that happens.
462    // Perhaps we need a different shared StreamStatus object that the
463    // StreamTarget holds?
464    sent_end: bool,
465    /// True if we have received an END message telling us to close the stream.
466    received_end: bool,
467    /// True if we have received an error.
468    ///
469    /// (This is not a subset or superset of received_end; some errors are END
470    /// messages but some aren't; some END messages are errors but some aren't.)
471    received_err: bool,
472}
473
474#[cfg(feature = "stream-ctrl")]
475impl DataStreamStatus {
476    /// Remember that we've received a connected message.
477    fn record_connected(&mut self) {
478        self.received_connected = true;
479    }
480
481    /// Remember that we've received an error of some kind.
482    fn record_error(&mut self, e: &Error) {
483        // TODO: Probably we should remember the actual error in a box or
484        // something.  But that means making a redundant copy of the error
485        // even if nobody will want it.  Do we care?
486        match e {
487            Error::EndReceived(EndReason::DONE) => self.received_end = true,
488            Error::EndReceived(_) => {
489                self.received_end = true;
490                self.received_err = true;
491            }
492            _ => self.received_err = true,
493        }
494    }
495}
496
497restricted_msg! {
498    /// An allowable incoming message on a client data stream.
499    enum ClientDataStreamMsg:RelayMsg {
500        // SENDME is handled by the reactor.
501        Data, End, Connected,
502    }
503}
504
505// TODO RPC: Should we also implement this trait for everything that holds a
506// ClientDataStreamCtrl?
507#[cfg(feature = "stream-ctrl")]
508impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
509    fn tunnel(&self) -> Option<Arc<ClientTunnel>> {
510        self.tunnel.upgrade()
511    }
512}
513
514#[cfg(feature = "stream-ctrl")]
515impl ClientDataStreamCtrl {
516    /// Return true if the underlying stream is connected. (That is, if it has
517    /// received a `CONNECTED` message, and has not been closed.)
518    pub fn is_connected(&self) -> bool {
519        let s = self.status.lock().expect("poisoned lock");
520        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
521    }
522
523    // TODO RPC: Add more functions once we have the desired API more nailed
524    // down.
525}
526
527impl DataStream {
528    /// Wrap raw stream receiver and target parts as a DataStream.
529    ///
530    /// For non-optimistic stream, function `wait_for_connection`
531    /// must be called after to make sure CONNECTED is received.
532    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
533        time_provider: P,
534        receiver: StreamReceiver,
535        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
536        target: StreamTarget,
537        memquota: StreamAccount,
538    ) -> Self {
539        Self::new_inner(
540            time_provider,
541            receiver,
542            xon_xoff_reader_ctrl,
543            target,
544            false,
545            memquota,
546        )
547    }
548
549    /// Wrap raw stream receiver and target parts as a connected DataStream.
550    ///
551    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
552    /// CONNECTED cell.
553    ///
554    /// This is used by hidden services, exit relays, and directory servers to accept streams.
555    #[cfg(any(feature = "hs-service", feature = "relay"))]
556    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
557        time_provider: P,
558        receiver: StreamReceiver,
559        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
560        target: StreamTarget,
561        memquota: StreamAccount,
562    ) -> Self {
563        Self::new_inner(
564            time_provider,
565            receiver,
566            xon_xoff_reader_ctrl,
567            target,
568            true,
569            memquota,
570        )
571    }
572
573    /// The shared implementation of the `new*()` functions.
574    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
575        time_provider: P,
576        receiver: StreamReceiver,
577        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
578        target: StreamTarget,
579        connected: bool,
580        memquota: StreamAccount,
581    ) -> Self {
582        let relay_cell_format = target.relay_cell_format();
583        let out_buf_len = Data::max_body_len(relay_cell_format);
584        let rate_limit_stream = target.rate_limit_stream().clone();
585
586        #[cfg(feature = "stream-ctrl")]
587        let status = {
588            let mut data_stream_status = DataStreamStatus::default();
589            if connected {
590                data_stream_status.record_connected();
591            }
592            Arc::new(Mutex::new(data_stream_status))
593        };
594
595        #[cfg(feature = "stream-ctrl")]
596        let ctrl = {
597            let tunnel = match target.tunnel() {
598                crate::stream::Tunnel::Client(t) => Arc::downgrade(t),
599                #[cfg(feature = "relay")]
600                crate::stream::Tunnel::Relay(_) => panic!("created a relay tunnel in the client?!"),
601            };
602
603            Arc::new(ClientDataStreamCtrl {
604                tunnel,
605                status: status.clone(),
606                _memquota: memquota.clone(),
607            })
608        };
609        let r = DataReaderInner {
610            state: Some(DataReaderState::Open(DataReaderImpl {
611                s: receiver,
612                pending: Vec::new(),
613                offset: 0,
614                connected,
615                #[cfg(feature = "stream-ctrl")]
616                status: status.clone(),
617            })),
618            _memquota: memquota.clone(),
619            #[cfg(feature = "stream-ctrl")]
620            ctrl: ctrl.clone(),
621        };
622        let w = DataWriterInner {
623            state: Some(DataWriterState::Ready(DataWriterImpl {
624                s: target,
625                buf: vec![0; out_buf_len].into_boxed_slice(),
626                n_pending: 0,
627                #[cfg(feature = "stream-ctrl")]
628                status,
629                relay_cell_format,
630            })),
631            _memquota: memquota,
632            #[cfg(feature = "stream-ctrl")]
633            ctrl: ctrl.clone(),
634        };
635
636        let time_provider = DynTimeProvider::new(time_provider);
637
638        DataStream {
639            w: DataWriter::new(w, rate_limit_stream, time_provider),
640            r: DataReader::new(r, xon_xoff_reader_ctrl),
641            #[cfg(feature = "stream-ctrl")]
642            ctrl,
643        }
644    }
645
646    /// Divide this DataStream into its constituent parts.
647    pub fn split(self) -> (DataReader, DataWriter) {
648        (self.r, self.w)
649    }
650
651    /// Wait until a CONNECTED cell is received, or some other cell
652    /// is received to indicate an error.
653    ///
654    /// Does nothing if this stream is already connected.
655    pub async fn wait_for_connection(&mut self) -> Result<()> {
656        // We must put state back before returning
657        let state = self
658            .r
659            .reader
660            .inner_mut()
661            .state
662            .take()
663            .expect("Missing state in DataReaderInner");
664
665        if let DataReaderState::Open(mut imp) = state {
666            let result = if imp.connected {
667                Ok(())
668            } else {
669                // This succeeds if the cell is CONNECTED, and fails otherwise.
670                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
671            };
672            self.r.reader.inner_mut().state = Some(match result {
673                Err(_) => DataReaderState::Closed,
674                Ok(_) => DataReaderState::Open(imp),
675            });
676            result
677        } else {
678            Err(Error::from(internal!(
679                "Expected ready state, got {:?}",
680                state
681            )))
682        }
683    }
684
685    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
686    /// interact with this stream without holding the stream itself.
687    #[cfg(feature = "stream-ctrl")]
688    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
689        Some(&self.ctrl)
690    }
691}
692
693impl AsyncRead for DataStream {
694    fn poll_read(
695        mut self: Pin<&mut Self>,
696        cx: &mut Context<'_>,
697        buf: &mut [u8],
698    ) -> Poll<IoResult<usize>> {
699        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
700    }
701}
702
703#[cfg(feature = "tokio")]
704impl TokioAsyncRead for DataStream {
705    fn poll_read(
706        self: Pin<&mut Self>,
707        cx: &mut Context<'_>,
708        buf: &mut ReadBuf<'_>,
709    ) -> Poll<IoResult<()>> {
710        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
711    }
712}
713
714impl AsyncWrite for DataStream {
715    fn poll_write(
716        mut self: Pin<&mut Self>,
717        cx: &mut Context<'_>,
718        buf: &[u8],
719    ) -> Poll<IoResult<usize>> {
720        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
721    }
722    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
723        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
724    }
725    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
726        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
727    }
728}
729
730#[cfg(feature = "tokio")]
731impl TokioAsyncWrite for DataStream {
732    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
733        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
734    }
735
736    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
737        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
738    }
739
740    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
741        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
742    }
743}
744
745/// Helper type: Like BoxFuture, but also requires that the future be Sync.
746type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
747
748/// An enumeration for the state of a DataWriter.
749///
750/// We have to use an enum here because, for as long as we're waiting
751/// for a flush operation to complete, the future returned by
752/// `flush_cell()` owns the DataWriterImpl.
753#[derive(Educe)]
754#[educe(Debug)]
755enum DataWriterState {
756    /// The writer has closed or gotten an error: nothing more to do.
757    Closed,
758    /// The writer is not currently flushing; more data can get queued
759    /// immediately.
760    Ready(DataWriterImpl),
761    /// The writer is flushing a cell.
762    Flushing(
763        #[educe(Debug(method = "skip_fmt"))] //
764        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
765    ),
766}
767
768/// Internal: the write part of a DataStream
769#[derive(Educe)]
770#[educe(Debug)]
771struct DataWriterImpl {
772    /// The underlying StreamTarget object.
773    s: StreamTarget,
774
775    /// Buffered data to send over the connection.
776    ///
777    /// This buffer is currently allocated using a number of bytes
778    /// equal to the maximum that we can package at a time.
779    //
780    // TODO: this buffer is probably smaller than we want, but it's good
781    // enough for now.  If we _do_ make it bigger, we'll have to change
782    // our use of Data::split_from to handle the case where we can't fit
783    // all the data.
784    #[educe(Debug(method = "skip_fmt"))]
785    buf: Box<[u8]>,
786
787    /// Number of unflushed bytes in buf.
788    n_pending: usize,
789
790    /// Relay cell format in use
791    relay_cell_format: RelayCellFormat,
792
793    /// Shared user-visible information about the state of this stream.
794    #[cfg(feature = "stream-ctrl")]
795    status: Arc<Mutex<DataStreamStatus>>,
796}
797
798impl DataWriterInner {
799    /// See [`DataWriter::client_stream_ctrl`].
800    #[cfg(feature = "stream-ctrl")]
801    fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
802        &self.ctrl
803    }
804
805    /// Helper for poll_flush() and poll_close(): Performs a flush, then
806    /// closes the stream if should_close is true.
807    fn poll_flush_impl(
808        mut self: Pin<&mut Self>,
809        cx: &mut Context<'_>,
810        should_close: bool,
811    ) -> Poll<IoResult<()>> {
812        let state = self.state.take().expect("Missing state in DataWriter");
813
814        // TODO: this whole function is a bit copy-pasted.
815        let mut future: BoxSyncFuture<_> = match state {
816            DataWriterState::Ready(imp) => {
817                if imp.n_pending == 0 {
818                    // Nothing to flush!
819                    if should_close {
820                        // We need to actually continue with this function to do the closing.
821                        // Thus, make a future that does nothing and is ready immediately.
822                        Box::pin(futures::future::ready((imp, Ok(()))))
823                    } else {
824                        // There's nothing more to do; we can return.
825                        self.state = Some(DataWriterState::Ready(imp));
826                        return Poll::Ready(Ok(()));
827                    }
828                } else {
829                    // We need to flush the buffer's contents; Make a future for that.
830                    Box::pin(imp.flush_buf())
831                }
832            }
833            DataWriterState::Flushing(fut) => fut,
834            DataWriterState::Closed => {
835                self.state = Some(DataWriterState::Closed);
836                return Poll::Ready(Err(Error::NotConnected.into()));
837            }
838        };
839
840        match future.as_mut().poll(cx) {
841            Poll::Ready((_imp, Err(e))) => {
842                self.state = Some(DataWriterState::Closed);
843                Poll::Ready(Err(e.into()))
844            }
845            Poll::Ready((mut imp, Ok(()))) => {
846                if should_close {
847                    // Tell the StreamTarget to close, so that the reactor
848                    // realizes that we are done sending. (Dropping `imp.s` does not
849                    // suffice, since there may be other clones of it.  In particular,
850                    // the StreamReceiver has one, which it uses to keep the stream
851                    // open, among other things.)
852                    imp.s.close();
853
854                    #[cfg(feature = "stream-ctrl")]
855                    {
856                        // TODO RPC:  This is not sufficient to track every case
857                        // where we might have sent an End.  See note on the
858                        // `sent_end` field.
859                        imp.status.lock().expect("lock poisoned").sent_end = true;
860                    }
861                    self.state = Some(DataWriterState::Closed);
862                } else {
863                    self.state = Some(DataWriterState::Ready(imp));
864                }
865                Poll::Ready(Ok(()))
866            }
867            Poll::Pending => {
868                self.state = Some(DataWriterState::Flushing(future));
869                Poll::Pending
870            }
871        }
872    }
873}
874
875impl AsyncWrite for DataWriterInner {
876    fn poll_write(
877        mut self: Pin<&mut Self>,
878        cx: &mut Context<'_>,
879        buf: &[u8],
880    ) -> Poll<IoResult<usize>> {
881        if buf.is_empty() {
882            return Poll::Ready(Ok(0));
883        }
884
885        let state = self.state.take().expect("Missing state in DataWriter");
886
887        let mut future = match state {
888            DataWriterState::Ready(mut imp) => {
889                let n_queued = imp.queue_bytes(buf);
890                if n_queued != 0 {
891                    self.state = Some(DataWriterState::Ready(imp));
892                    return Poll::Ready(Ok(n_queued));
893                }
894                // we couldn't queue anything, so the current cell must be full.
895                Box::pin(imp.flush_buf())
896            }
897            DataWriterState::Flushing(fut) => fut,
898            DataWriterState::Closed => {
899                self.state = Some(DataWriterState::Closed);
900                return Poll::Ready(Err(Error::NotConnected.into()));
901            }
902        };
903
904        match future.as_mut().poll(cx) {
905            Poll::Ready((_imp, Err(e))) => {
906                #[cfg(feature = "stream-ctrl")]
907                {
908                    _imp.status.lock().expect("lock poisoned").record_error(&e);
909                }
910                self.state = Some(DataWriterState::Closed);
911                Poll::Ready(Err(e.into()))
912            }
913            Poll::Ready((mut imp, Ok(()))) => {
914                // Great!  We're done flushing.  Queue as much as we can of this
915                // cell.
916                let n_queued = imp.queue_bytes(buf);
917                self.state = Some(DataWriterState::Ready(imp));
918                Poll::Ready(Ok(n_queued))
919            }
920            Poll::Pending => {
921                self.state = Some(DataWriterState::Flushing(future));
922                Poll::Pending
923            }
924        }
925    }
926
927    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
928        self.poll_flush_impl(cx, false)
929    }
930
931    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
932        self.poll_flush_impl(cx, true)
933    }
934}
935
936#[cfg(feature = "tokio")]
937impl TokioAsyncWrite for DataWriterInner {
938    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
939        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
940    }
941
942    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
943        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
944    }
945
946    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
947        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
948    }
949}
950
951impl DataWriterImpl {
952    /// Try to flush the current buffer contents as a data cell.
953    async fn flush_buf(mut self) -> (Self, Result<()>) {
954        let result = if let Some((cell, remainder)) =
955            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
956        {
957            // TODO: Eventually we may want a larger buffer; if we do,
958            // this invariant will become false.
959            assert!(remainder.is_empty());
960            self.n_pending = 0;
961            self.s.send(cell.into()).await
962        } else {
963            Ok(())
964        };
965
966        (self, result)
967    }
968
969    /// Add as many bytes as possible from `b` to our internal buffer;
970    /// return the number we were able to add.
971    fn queue_bytes(&mut self, b: &[u8]) -> usize {
972        let empty_space = &mut self.buf[self.n_pending..];
973        if empty_space.is_empty() {
974            // that is, len == 0
975            return 0;
976        }
977
978        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
979        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
980        self.n_pending += n_to_copy;
981        n_to_copy
982    }
983}
984
985impl DataReaderInner {
986    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
987    /// interact with this stream without holding the stream itself.
988    #[cfg(feature = "stream-ctrl")]
989    pub(crate) fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
990        &self.ctrl
991    }
992}
993
994/// An enumeration for the state of a [`DataReaderInner`].
995// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
996// future. There are a few ways we could simplify this. See:
997// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
998#[derive(Educe)]
999#[educe(Debug)]
1000// We allow this since it's expected that streams will spend most of their time in the `Open` state,
1001// and will be cleaned up shortly after closing.
1002#[allow(clippy::large_enum_variant)]
1003enum DataReaderState {
1004    /// In this state we have received an end cell or an error.
1005    Closed,
1006    /// In this state the reader is open.
1007    Open(DataReaderImpl),
1008}
1009
1010/// Wrapper for the read part of a [`DataStream`].
1011#[derive(Educe)]
1012#[educe(Debug)]
1013#[pin_project]
1014struct DataReaderImpl {
1015    /// The underlying StreamReceiver object.
1016    #[educe(Debug(method = "skip_fmt"))]
1017    #[pin]
1018    s: StreamReceiver,
1019
1020    /// If present, data that we received on this stream but have not
1021    /// been able to send to the caller yet.
1022    // TODO: This data structure is probably not what we want, but
1023    // it's good enough for now.
1024    #[educe(Debug(method = "skip_fmt"))]
1025    pending: Vec<u8>,
1026
1027    /// Index into pending to show what we've already read.
1028    offset: usize,
1029
1030    /// If true, we have received a CONNECTED cell on this stream.
1031    connected: bool,
1032
1033    /// Shared user-visible information about the state of this stream.
1034    #[cfg(feature = "stream-ctrl")]
1035    status: Arc<Mutex<DataStreamStatus>>,
1036}
1037
1038impl AsyncRead for DataReaderInner {
1039    fn poll_read(
1040        mut self: Pin<&mut Self>,
1041        cx: &mut Context<'_>,
1042        buf: &mut [u8],
1043    ) -> Poll<IoResult<usize>> {
1044        // We're pulling the state object out of the reader.  We MUST
1045        // put it back before this function returns.
1046        let mut state = self.state.take().expect("Missing state in DataReaderInner");
1047
1048        loop {
1049            let mut imp = match state {
1050                DataReaderState::Open(mut imp) => {
1051                    // There may be data to read already.
1052                    let n_copied = imp.extract_bytes(buf);
1053                    if n_copied != 0 || buf.is_empty() {
1054                        // We read data into the buffer, or the buffer was 0-len to begin with.
1055                        // Tell the caller.
1056                        self.state = Some(DataReaderState::Open(imp));
1057                        return Poll::Ready(Ok(n_copied));
1058                    }
1059
1060                    // No data available!  We have to try reading.
1061                    imp
1062                }
1063                DataReaderState::Closed => {
1064                    // TODO: Why are we returning an error rather than continuing to return EOF?
1065                    self.state = Some(DataReaderState::Closed);
1066                    return Poll::Ready(Err(Error::NotConnected.into()));
1067                }
1068            };
1069
1070            // See if a cell is ready.
1071            match Pin::new(&mut imp).read_cell(cx) {
1072                Poll::Ready(Err(e)) => {
1073                    // There aren't any survivable errors in the current
1074                    // design.
1075                    self.state = Some(DataReaderState::Closed);
1076                    #[cfg(feature = "stream-ctrl")]
1077                    {
1078                        imp.status.lock().expect("lock poisoned").record_error(&e);
1079                    }
1080                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
1081                        Ok(0)
1082                    } else {
1083                        Err(e.into())
1084                    };
1085                    return Poll::Ready(result);
1086                }
1087                Poll::Ready(Ok(())) => {
1088                    // It read a cell!  Continue the loop.
1089                    state = DataReaderState::Open(imp);
1090                }
1091                Poll::Pending => {
1092                    // No cells ready, so tell the
1093                    // caller to get back to us later.
1094                    self.state = Some(DataReaderState::Open(imp));
1095                    return Poll::Pending;
1096                }
1097            }
1098        }
1099    }
1100}
1101
1102#[cfg(feature = "tokio")]
1103impl TokioAsyncRead for DataReaderInner {
1104    fn poll_read(
1105        self: Pin<&mut Self>,
1106        cx: &mut Context<'_>,
1107        buf: &mut ReadBuf<'_>,
1108    ) -> Poll<IoResult<()>> {
1109        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
1110    }
1111}
1112
1113impl DataReaderImpl {
1114    /// Pull as many bytes as we can off of self.pending, and return that
1115    /// number of bytes.
1116    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
1117        let remainder = &self.pending[self.offset..];
1118        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
1119        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
1120        self.offset += n_to_copy;
1121
1122        n_to_copy
1123    }
1124
1125    /// Return true iff there are no buffered bytes here to yield
1126    fn buf_is_empty(&self) -> bool {
1127        self.pending.len() == self.offset
1128    }
1129
1130    /// Load self.pending with the contents of a new data cell.
1131    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1132        use ClientDataStreamMsg::*;
1133        let msg = match self.as_mut().project().s.poll_next(cx) {
1134            Poll::Pending => return Poll::Pending,
1135            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<ClientDataStreamMsg>() {
1136                Ok(cell) => cell.into_msg(),
1137                Err(e) => {
1138                    self.s.protocol_error();
1139                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
1140                }
1141            },
1142            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
1143            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
1144            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
1145            // terminated, we should be returning `None` here and not considering it as an error.
1146            // The `StreamReceiver` will have already returned an error if the cell stream was
1147            // terminated without an END message.
1148            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
1149        };
1150
1151        let result = match msg {
1152            Connected(_) if !self.connected => {
1153                self.connected = true;
1154                #[cfg(feature = "stream-ctrl")]
1155                {
1156                    self.status
1157                        .lock()
1158                        .expect("poisoned lock")
1159                        .record_connected();
1160                }
1161                Ok(())
1162            }
1163            Connected(_) => {
1164                self.s.protocol_error();
1165                Err(Error::StreamProto(
1166                    "Received a second connect cell on a data stream".to_string(),
1167                ))
1168            }
1169            Data(d) if self.connected => {
1170                self.add_data(d.into());
1171                Ok(())
1172            }
1173            Data(_) => {
1174                self.s.protocol_error();
1175                Err(Error::StreamProto(
1176                    "Received a data cell an unconnected stream".to_string(),
1177                ))
1178            }
1179            End(e) => Err(Error::EndReceived(e.reason())),
1180        };
1181
1182        Poll::Ready(result)
1183    }
1184
1185    /// Add the data from `d` to the end of our pending bytes.
1186    fn add_data(&mut self, mut d: Vec<u8>) {
1187        if self.buf_is_empty() {
1188            // No data pending?  Just take d as the new pending.
1189            self.pending = d;
1190            self.offset = 0;
1191        } else {
1192            // TODO(nickm) This has potential to grow `pending` without bound.
1193            // Fortunately, we don't currently read cells or call this
1194            // `add_data` method when pending is nonempty—but if we do in the
1195            // future, we'll have to be careful here.
1196            self.pending.append(&mut d);
1197        }
1198    }
1199}
1200
1201/// A `CmdChecker` that enforces invariants for outbound data streams.
1202#[derive(Debug)]
1203pub(crate) struct OutboundDataCmdChecker {
1204    /// True if we are expecting to receive a CONNECTED message on this stream.
1205    expecting_connected: bool,
1206}
1207
1208impl Default for OutboundDataCmdChecker {
1209    fn default() -> Self {
1210        Self {
1211            expecting_connected: true,
1212        }
1213    }
1214}
1215
1216impl CmdChecker for OutboundDataCmdChecker {
1217    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
1218        use StreamStatus::*;
1219        match msg.cmd() {
1220            RelayCmd::CONNECTED => {
1221                if !self.expecting_connected {
1222                    Err(Error::StreamProto(
1223                        "Received CONNECTED twice on a stream.".into(),
1224                    ))
1225                } else {
1226                    self.expecting_connected = false;
1227                    Ok(Open)
1228                }
1229            }
1230            RelayCmd::DATA => {
1231                if !self.expecting_connected {
1232                    Ok(Open)
1233                } else {
1234                    Err(Error::StreamProto(
1235                        "Received DATA before CONNECTED on a stream".into(),
1236                    ))
1237                }
1238            }
1239            RelayCmd::END => Ok(Closed),
1240            _ => Err(Error::StreamProto(format!(
1241                "Unexpected {} on a data stream!",
1242                msg.cmd()
1243            ))),
1244        }
1245    }
1246
1247    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1248        let _ = msg
1249            .decode::<ClientDataStreamMsg>()
1250            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1251        Ok(())
1252    }
1253}
1254
1255impl OutboundDataCmdChecker {
1256    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1257    /// constructed connection.
1258    pub(crate) fn new_any() -> AnyCmdChecker {
1259        Box::<Self>::default()
1260    }
1261}