tor_proto/client/stream/
data.rs

1//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::stream::StreamExt;
11use futures::task::{Context, Poll};
12use futures::{Future, Stream};
13use pin_project::pin_project;
14use postage::watch;
15
16#[cfg(feature = "tokio")]
17use tokio_crate::io::ReadBuf;
18#[cfg(feature = "tokio")]
19use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20#[cfg(feature = "tokio")]
21use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22use tor_cell::restricted_msg;
23
24use std::fmt::Debug;
25use std::io::Result as IoResult;
26use std::num::NonZero;
27use std::pin::Pin;
28#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29use std::sync::Arc;
30#[cfg(feature = "stream-ctrl")]
31use std::sync::{Mutex, Weak};
32
33use educe::Educe;
34
35use crate::client::stream::StreamReceiver;
36use crate::client::{ClientTunnel, StreamTarget};
37use crate::memquota::StreamAccount;
38use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
39use crate::stream::flow_ctrl::state::StreamRateLimit;
40use crate::stream::flow_ctrl::xon_xoff::reader::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
41use crate::util::token_bucket::dynamic_writer::DynamicRateLimitedWriter;
42use crate::util::token_bucket::writer::{RateLimitedWriter, RateLimitedWriterConfig};
43use tor_basic_utils::skip_fmt;
44use tor_cell::relaycell::msg::Data;
45use tor_error::internal;
46use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
47
48/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
49///
50/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
51/// [`DataWriter`], which leaves us with this ugly type.
52/// We use a type alias to make `DataWriter` a little nicer.
53type RateConfigStream = futures::stream::Map<
54    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
55    fn(StreamRateLimit) -> RateLimitedWriterConfig,
56>;
57
58/// An anonymized stream over the Tor network.
59///
60/// For most purposes, you can think of this type as an anonymized
61/// TCP stream: it can read and write data, and get closed when it's done.
62///
63/// [`DataStream`] implements [`futures::io::AsyncRead`] and
64/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
65/// traits are expected.
66///
67/// # Examples
68///
69/// Connecting to an HTTP server and sending a request, using
70/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
71///
72/// ```ignore
73/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
74///
75/// use futures::io::AsyncWriteExt;
76///
77/// stream
78///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
79///     .await?;
80///
81/// // Flushing the stream is important; see below!
82/// stream.flush().await?;
83/// ```
84///
85/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
86///
87/// ```ignore
88/// use futures::io::AsyncReadExt;
89///
90/// let mut buf = Vec::new();
91/// stream.read_to_end(&mut buf).await?;
92///
93/// println!("{}", String::from_utf8_lossy(&buf));
94/// ```
95///
96/// # Usage with Tokio
97///
98/// If the `tokio` crate feature is enabled, this type also implements
99/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
100/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
101/// with code that expects those traits.
102///
103/// # Remember to call `flush`!
104///
105/// DataStream buffers data internally, in order to write as few cells
106/// as possible onto the network.  In order to make sure that your
107/// data has actually been sent, you need to make sure that
108/// [`AsyncWrite::poll_flush`] runs to completion: probably via
109/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
110///
111/// # Splitting the type
112///
113/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
114/// `DataStream::split` method can be used to split it into those two parts, for more
115/// convenient usage with e.g. stream combinators.
116///
117/// # How long does a stream live?
118///
119/// A `DataStream` will live until all references to it are dropped,
120/// or until it is closed explicitly.
121///
122/// If you split the stream into a `DataReader` and a `DataWriter`, it
123/// will survive until _both_ are dropped, or until it is closed
124/// explicitly.
125///
126/// A stream can also close because of a network error,
127/// or because the other side of the stream decided to close it.
128///
129// # Semver note
130//
131// Note that this type is re-exported as a part of the public API of
132// the `arti-client` crate.  Any changes to its API here in
133// `tor-proto` need to be reflected above.
134#[derive(Debug)]
135pub struct DataStream {
136    /// Underlying writer for this stream
137    w: DataWriter,
138    /// Underlying reader for this stream
139    r: DataReader,
140    /// A control object that can be used to monitor and control this stream
141    /// without needing to own it.
142    #[cfg(feature = "stream-ctrl")]
143    ctrl: Arc<ClientDataStreamCtrl>,
144}
145assert_impl_all! { DataStream: Send, Sync }
146
147/// An object used to control and monitor a data stream.
148///
149/// # Notes
150///
151/// This is a separate type from [`DataStream`] because it's useful to have
152/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
153/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
154/// work correctly.
155#[cfg(feature = "stream-ctrl")]
156#[derive(Debug)]
157pub struct ClientDataStreamCtrl {
158    /// The circuit to which this stream is attached.
159    ///
160    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
161    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
162    /// one of those is alive, this reference will be present.
163    ///
164    /// We make this a Weak reference so that once the stream itself is closed,
165    /// we can't leak circuits.
166    tunnel: Weak<ClientTunnel>,
167
168    /// Shared user-visible information about the state of this stream.
169    ///
170    /// TODO RPC: This will probably want to be a `postage::Watch` or something
171    /// similar, if and when it stops moving around.
172    #[cfg(feature = "stream-ctrl")]
173    status: Arc<Mutex<DataStreamStatus>>,
174
175    /// The memory quota account that should be used for this stream's data
176    ///
177    /// Exists to keep the account alive
178    _memquota: StreamAccount,
179}
180
181/// The inner writer for [`DataWriter`].
182///
183/// This type is responsible for taking bytes and packaging them into cells.
184/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
185#[derive(Debug)]
186struct DataWriterInner {
187    /// Internal state for this writer
188    ///
189    /// This is stored in an Option so that we can mutate it in the
190    /// AsyncWrite functions.  It might be possible to do better here,
191    /// and we should refactor if so.
192    state: Option<DataWriterState>,
193
194    /// The memory quota account that should be used for this stream's data
195    ///
196    /// Exists to keep the account alive
197    // If we liked, we could make this conditional; see DataReaderInner.memquota
198    _memquota: StreamAccount,
199
200    /// A control object that can be used to monitor and control this stream
201    /// without needing to own it.
202    #[cfg(feature = "stream-ctrl")]
203    ctrl: Arc<ClientDataStreamCtrl>,
204}
205
206/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
207///
208/// See the [`DataStream`] docs for more information. In particular, note
209/// that this writer requires `poll_flush` to complete in order to guarantee that
210/// all data has been written.
211///
212/// # Usage with Tokio
213///
214/// If the `tokio` crate feature is enabled, this type also implements
215/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
216/// with code that expects that trait.
217///
218/// # Drop and close
219///
220/// Note that dropping a `DataWriter` has no special effect on its own:
221/// if the `DataWriter` is dropped, the underlying stream will still remain open
222/// until the `DataReader` is also dropped.
223///
224/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
225/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
226///
227/// Remember that Tor does not support half-open streams:
228/// If you `close` or `shutdown` a stream,
229/// the other side will not see the stream as half-open,
230/// and so will (probably) not finish sending you any in-progress data.
231/// Do not use `close`/`shutdown` to communicate anything besides
232/// "I am done using this stream."
233///
234// # Semver note
235//
236// Note that this type is re-exported as a part of the public API of
237// the `arti-client` crate.  Any changes to its API here in
238// `tor-proto` need to be reflected above.
239#[derive(Debug)]
240pub struct DataWriter {
241    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
242    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
243}
244
245impl DataWriter {
246    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
247    fn new(
248        inner: DataWriterInner,
249        rate_limit_updates: watch::Receiver<StreamRateLimit>,
250        time_provider: DynTimeProvider,
251    ) -> Self {
252        /// Converts a `rate` into a `RateLimitedWriterConfig`.
253        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
254            let rate = rate.bytes_per_sec();
255            RateLimitedWriterConfig {
256                rate,        // bytes per second
257                burst: rate, // bytes
258                // This number is chosen arbitrarily, but the idea is that we want to balance
259                // between throughput and latency. Assume the user tries to write a large buffer
260                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
261                // frequently and writing a small number of bytes each time to the
262                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
263                // too large (for example 510), we'll be waking up infrequently to write a larger
264                // number of bytes each time. So even if the `DataWriterInner` has almost a full
265                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
266                // bytes before a cell can be sent, it will block until the rate limiter allows 510
267                // more bytes.
268                //
269                // TODO(arti#2028): Is there an optimal value here?
270                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
271            }
272        }
273
274        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
275        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
276
277        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
278        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
279
280        // build the rate limiter
281        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
282        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
283
284        Self { writer }
285    }
286
287    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
288    /// interact with this stream without holding the stream itself.
289    #[cfg(feature = "stream-ctrl")]
290    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
291        Some(self.writer.inner().client_stream_ctrl())
292    }
293}
294
295impl AsyncWrite for DataWriter {
296    fn poll_write(
297        mut self: Pin<&mut Self>,
298        cx: &mut Context<'_>,
299        buf: &[u8],
300    ) -> Poll<IoResult<usize>> {
301        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
302    }
303
304    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
305        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
306    }
307
308    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
309        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
310    }
311}
312
313#[cfg(feature = "tokio")]
314impl TokioAsyncWrite for DataWriter {
315    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
316        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
317    }
318
319    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
320        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
321    }
322
323    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
324        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
325    }
326}
327
328/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
329///
330/// See the [`DataStream`] docs for more information.
331///
332/// # Usage with Tokio
333///
334/// If the `tokio` crate feature is enabled, this type also implements
335/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
336/// with code that expects that trait.
337//
338// # Semver note
339//
340// Note that this type is re-exported as a part of the public API of
341// the `arti-client` crate.  Any changes to its API here in
342// `tor-proto` need to be reflected above.
343#[derive(Debug)]
344pub struct DataReader {
345    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
346    reader: XonXoffReader<DataReaderInner>,
347}
348
349impl DataReader {
350    /// Create a new [`DataReader`].
351    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
352        Self {
353            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
354        }
355    }
356
357    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
358    /// interact with this stream without holding the stream itself.
359    #[cfg(feature = "stream-ctrl")]
360    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
361        Some(self.reader.inner().client_stream_ctrl())
362    }
363}
364
365impl AsyncRead for DataReader {
366    fn poll_read(
367        mut self: Pin<&mut Self>,
368        cx: &mut Context<'_>,
369        buf: &mut [u8],
370    ) -> Poll<IoResult<usize>> {
371        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
372    }
373
374    fn poll_read_vectored(
375        mut self: Pin<&mut Self>,
376        cx: &mut Context<'_>,
377        bufs: &mut [std::io::IoSliceMut<'_>],
378    ) -> Poll<IoResult<usize>> {
379        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
380    }
381}
382
383#[cfg(feature = "tokio")]
384impl TokioAsyncRead for DataReader {
385    fn poll_read(
386        self: Pin<&mut Self>,
387        cx: &mut Context<'_>,
388        buf: &mut ReadBuf<'_>,
389    ) -> Poll<IoResult<()>> {
390        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
391    }
392}
393
394/// The inner reader for [`DataReader`].
395///
396/// This type is responsible for taking stream messages and extracting the stream data from them.
397/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
398#[derive(Debug)]
399pub(crate) struct DataReaderInner {
400    /// Internal state for this reader.
401    ///
402    /// This is stored in an Option so that we can mutate it in
403    /// poll_read().  It might be possible to do better here, and we
404    /// should refactor if so.
405    state: Option<DataReaderState>,
406
407    /// The memory quota account that should be used for this stream's data
408    ///
409    /// Exists to keep the account alive
410    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
411    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
412    _memquota: StreamAccount,
413
414    /// A control object that can be used to monitor and control this stream
415    /// without needing to own it.
416    #[cfg(feature = "stream-ctrl")]
417    ctrl: Arc<ClientDataStreamCtrl>,
418}
419
420impl BufferIsEmpty for DataReaderInner {
421    /// The result will become stale,
422    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
423    fn is_empty(mut self: Pin<&mut Self>) -> bool {
424        match self
425            .state
426            .as_mut()
427            .expect("forgot to put `DataReaderState` back")
428        {
429            DataReaderState::Open(imp) => {
430                // check if the partial cell in `pending` is empty,
431                // and if the message stream is empty
432                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
433            }
434            // closed, so any data should have been discarded
435            DataReaderState::Closed => true,
436        }
437    }
438}
439
440/// Shared status flags for tracking the status of as `DataStream`.
441///
442/// We expect to refactor this a bit, so it's not exposed at all.
443//
444// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
445// from various points in this module, we should instead construct
446// DataStreamStatus as needed from information available elsewhere.  In any
447// case, we should really  eliminate as much duplicate state here as we can.
448// (See discussions at !1198 for some challenges with this.)
449#[cfg(feature = "stream-ctrl")]
450#[derive(Clone, Debug, Default)]
451struct DataStreamStatus {
452    /// True if we've received a CONNECTED message.
453    //
454    // TODO: This is redundant with `connected` in DataReaderImpl.
455    received_connected: bool,
456    /// True if we have decided to send an END message.
457    //
458    // TODO RPC: There is not an easy way to set this from this module!  Really,
459    // the decision to send an "end" is made when the StreamTarget object is
460    // dropped, but we don't currently have any way to see when that happens.
461    // Perhaps we need a different shared StreamStatus object that the
462    // StreamTarget holds?
463    sent_end: bool,
464    /// True if we have received an END message telling us to close the stream.
465    received_end: bool,
466    /// True if we have received an error.
467    ///
468    /// (This is not a subset or superset of received_end; some errors are END
469    /// messages but some aren't; some END messages are errors but some aren't.)
470    received_err: bool,
471}
472
473#[cfg(feature = "stream-ctrl")]
474impl DataStreamStatus {
475    /// Remember that we've received a connected message.
476    fn record_connected(&mut self) {
477        self.received_connected = true;
478    }
479
480    /// Remember that we've received an error of some kind.
481    fn record_error(&mut self, e: &Error) {
482        // TODO: Probably we should remember the actual error in a box or
483        // something.  But that means making a redundant copy of the error
484        // even if nobody will want it.  Do we care?
485        match e {
486            Error::EndReceived(EndReason::DONE) => self.received_end = true,
487            Error::EndReceived(_) => {
488                self.received_end = true;
489                self.received_err = true;
490            }
491            _ => self.received_err = true,
492        }
493    }
494}
495
496restricted_msg! {
497    /// An allowable incoming message on a client data stream.
498    enum ClientDataStreamMsg:RelayMsg {
499        // SENDME is handled by the reactor.
500        Data, End, Connected,
501    }
502}
503
504// TODO RPC: Should we also implement this trait for everything that holds a
505// ClientDataStreamCtrl?
506#[cfg(feature = "stream-ctrl")]
507impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
508    fn tunnel(&self) -> Option<Arc<ClientTunnel>> {
509        self.tunnel.upgrade()
510    }
511}
512
513#[cfg(feature = "stream-ctrl")]
514impl ClientDataStreamCtrl {
515    /// Return true if the underlying stream is connected. (That is, if it has
516    /// received a `CONNECTED` message, and has not been closed.)
517    pub fn is_connected(&self) -> bool {
518        let s = self.status.lock().expect("poisoned lock");
519        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
520    }
521
522    // TODO RPC: Add more functions once we have the desired API more nailed
523    // down.
524}
525
526impl DataStream {
527    /// Wrap raw stream receiver and target parts as a DataStream.
528    ///
529    /// For non-optimistic stream, function `wait_for_connection`
530    /// must be called after to make sure CONNECTED is received.
531    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
532        time_provider: P,
533        receiver: StreamReceiver,
534        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
535        target: StreamTarget,
536        memquota: StreamAccount,
537    ) -> Self {
538        Self::new_inner(
539            time_provider,
540            receiver,
541            xon_xoff_reader_ctrl,
542            target,
543            false,
544            memquota,
545        )
546    }
547
548    /// Wrap raw stream receiver and target parts as a connected DataStream.
549    ///
550    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
551    /// CONNECTED cell.
552    ///
553    /// This is used by hidden services, exit relays, and directory servers to accept streams.
554    #[cfg(feature = "hs-service")]
555    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
556        time_provider: P,
557        receiver: StreamReceiver,
558        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
559        target: StreamTarget,
560        memquota: StreamAccount,
561    ) -> Self {
562        Self::new_inner(
563            time_provider,
564            receiver,
565            xon_xoff_reader_ctrl,
566            target,
567            true,
568            memquota,
569        )
570    }
571
572    /// The shared implementation of the `new*()` functions.
573    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
574        time_provider: P,
575        receiver: StreamReceiver,
576        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
577        target: StreamTarget,
578        connected: bool,
579        memquota: StreamAccount,
580    ) -> Self {
581        let relay_cell_format = target.relay_cell_format();
582        let out_buf_len = Data::max_body_len(relay_cell_format);
583        let rate_limit_stream = target.rate_limit_stream().clone();
584
585        #[cfg(feature = "stream-ctrl")]
586        let status = {
587            let mut data_stream_status = DataStreamStatus::default();
588            if connected {
589                data_stream_status.record_connected();
590            }
591            Arc::new(Mutex::new(data_stream_status))
592        };
593
594        #[cfg(feature = "stream-ctrl")]
595        let ctrl = Arc::new(ClientDataStreamCtrl {
596            tunnel: Arc::downgrade(target.tunnel()),
597            status: status.clone(),
598            _memquota: memquota.clone(),
599        });
600        let r = DataReaderInner {
601            state: Some(DataReaderState::Open(DataReaderImpl {
602                s: receiver,
603                pending: Vec::new(),
604                offset: 0,
605                connected,
606                #[cfg(feature = "stream-ctrl")]
607                status: status.clone(),
608            })),
609            _memquota: memquota.clone(),
610            #[cfg(feature = "stream-ctrl")]
611            ctrl: ctrl.clone(),
612        };
613        let w = DataWriterInner {
614            state: Some(DataWriterState::Ready(DataWriterImpl {
615                s: target,
616                buf: vec![0; out_buf_len].into_boxed_slice(),
617                n_pending: 0,
618                #[cfg(feature = "stream-ctrl")]
619                status,
620                relay_cell_format,
621            })),
622            _memquota: memquota,
623            #[cfg(feature = "stream-ctrl")]
624            ctrl: ctrl.clone(),
625        };
626
627        let time_provider = DynTimeProvider::new(time_provider);
628
629        DataStream {
630            w: DataWriter::new(w, rate_limit_stream, time_provider),
631            r: DataReader::new(r, xon_xoff_reader_ctrl),
632            #[cfg(feature = "stream-ctrl")]
633            ctrl,
634        }
635    }
636
637    /// Divide this DataStream into its constituent parts.
638    pub fn split(self) -> (DataReader, DataWriter) {
639        (self.r, self.w)
640    }
641
642    /// Wait until a CONNECTED cell is received, or some other cell
643    /// is received to indicate an error.
644    ///
645    /// Does nothing if this stream is already connected.
646    pub async fn wait_for_connection(&mut self) -> Result<()> {
647        // We must put state back before returning
648        let state = self
649            .r
650            .reader
651            .inner_mut()
652            .state
653            .take()
654            .expect("Missing state in DataReaderInner");
655
656        if let DataReaderState::Open(mut imp) = state {
657            let result = if imp.connected {
658                Ok(())
659            } else {
660                // This succeeds if the cell is CONNECTED, and fails otherwise.
661                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
662            };
663            self.r.reader.inner_mut().state = Some(match result {
664                Err(_) => DataReaderState::Closed,
665                Ok(_) => DataReaderState::Open(imp),
666            });
667            result
668        } else {
669            Err(Error::from(internal!(
670                "Expected ready state, got {:?}",
671                state
672            )))
673        }
674    }
675
676    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
677    /// interact with this stream without holding the stream itself.
678    #[cfg(feature = "stream-ctrl")]
679    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
680        Some(&self.ctrl)
681    }
682}
683
684impl AsyncRead for DataStream {
685    fn poll_read(
686        mut self: Pin<&mut Self>,
687        cx: &mut Context<'_>,
688        buf: &mut [u8],
689    ) -> Poll<IoResult<usize>> {
690        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
691    }
692}
693
694#[cfg(feature = "tokio")]
695impl TokioAsyncRead for DataStream {
696    fn poll_read(
697        self: Pin<&mut Self>,
698        cx: &mut Context<'_>,
699        buf: &mut ReadBuf<'_>,
700    ) -> Poll<IoResult<()>> {
701        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
702    }
703}
704
705impl AsyncWrite for DataStream {
706    fn poll_write(
707        mut self: Pin<&mut Self>,
708        cx: &mut Context<'_>,
709        buf: &[u8],
710    ) -> Poll<IoResult<usize>> {
711        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
712    }
713    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
714        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
715    }
716    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
717        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
718    }
719}
720
721#[cfg(feature = "tokio")]
722impl TokioAsyncWrite for DataStream {
723    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
724        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
725    }
726
727    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
728        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
729    }
730
731    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
732        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
733    }
734}
735
736/// Helper type: Like BoxFuture, but also requires that the future be Sync.
737type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
738
739/// An enumeration for the state of a DataWriter.
740///
741/// We have to use an enum here because, for as long as we're waiting
742/// for a flush operation to complete, the future returned by
743/// `flush_cell()` owns the DataWriterImpl.
744#[derive(Educe)]
745#[educe(Debug)]
746enum DataWriterState {
747    /// The writer has closed or gotten an error: nothing more to do.
748    Closed,
749    /// The writer is not currently flushing; more data can get queued
750    /// immediately.
751    Ready(DataWriterImpl),
752    /// The writer is flushing a cell.
753    Flushing(
754        #[educe(Debug(method = "skip_fmt"))] //
755        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
756    ),
757}
758
759/// Internal: the write part of a DataStream
760#[derive(Educe)]
761#[educe(Debug)]
762struct DataWriterImpl {
763    /// The underlying StreamTarget object.
764    s: StreamTarget,
765
766    /// Buffered data to send over the connection.
767    ///
768    /// This buffer is currently allocated using a number of bytes
769    /// equal to the maximum that we can package at a time.
770    //
771    // TODO: this buffer is probably smaller than we want, but it's good
772    // enough for now.  If we _do_ make it bigger, we'll have to change
773    // our use of Data::split_from to handle the case where we can't fit
774    // all the data.
775    #[educe(Debug(method = "skip_fmt"))]
776    buf: Box<[u8]>,
777
778    /// Number of unflushed bytes in buf.
779    n_pending: usize,
780
781    /// Relay cell format in use
782    relay_cell_format: RelayCellFormat,
783
784    /// Shared user-visible information about the state of this stream.
785    #[cfg(feature = "stream-ctrl")]
786    status: Arc<Mutex<DataStreamStatus>>,
787}
788
789impl DataWriterInner {
790    /// See [`DataWriter::client_stream_ctrl`].
791    #[cfg(feature = "stream-ctrl")]
792    fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
793        &self.ctrl
794    }
795
796    /// Helper for poll_flush() and poll_close(): Performs a flush, then
797    /// closes the stream if should_close is true.
798    fn poll_flush_impl(
799        mut self: Pin<&mut Self>,
800        cx: &mut Context<'_>,
801        should_close: bool,
802    ) -> Poll<IoResult<()>> {
803        let state = self.state.take().expect("Missing state in DataWriter");
804
805        // TODO: this whole function is a bit copy-pasted.
806        let mut future: BoxSyncFuture<_> = match state {
807            DataWriterState::Ready(imp) => {
808                if imp.n_pending == 0 {
809                    // Nothing to flush!
810                    if should_close {
811                        // We need to actually continue with this function to do the closing.
812                        // Thus, make a future that does nothing and is ready immediately.
813                        Box::pin(futures::future::ready((imp, Ok(()))))
814                    } else {
815                        // There's nothing more to do; we can return.
816                        self.state = Some(DataWriterState::Ready(imp));
817                        return Poll::Ready(Ok(()));
818                    }
819                } else {
820                    // We need to flush the buffer's contents; Make a future for that.
821                    Box::pin(imp.flush_buf())
822                }
823            }
824            DataWriterState::Flushing(fut) => fut,
825            DataWriterState::Closed => {
826                self.state = Some(DataWriterState::Closed);
827                return Poll::Ready(Err(Error::NotConnected.into()));
828            }
829        };
830
831        match future.as_mut().poll(cx) {
832            Poll::Ready((_imp, Err(e))) => {
833                self.state = Some(DataWriterState::Closed);
834                Poll::Ready(Err(e.into()))
835            }
836            Poll::Ready((mut imp, Ok(()))) => {
837                if should_close {
838                    // Tell the StreamTarget to close, so that the reactor
839                    // realizes that we are done sending. (Dropping `imp.s` does not
840                    // suffice, since there may be other clones of it.  In particular,
841                    // the StreamReceiver has one, which it uses to keep the stream
842                    // open, among other things.)
843                    imp.s.close();
844
845                    #[cfg(feature = "stream-ctrl")]
846                    {
847                        // TODO RPC:  This is not sufficient to track every case
848                        // where we might have sent an End.  See note on the
849                        // `sent_end` field.
850                        imp.status.lock().expect("lock poisoned").sent_end = true;
851                    }
852                    self.state = Some(DataWriterState::Closed);
853                } else {
854                    self.state = Some(DataWriterState::Ready(imp));
855                }
856                Poll::Ready(Ok(()))
857            }
858            Poll::Pending => {
859                self.state = Some(DataWriterState::Flushing(future));
860                Poll::Pending
861            }
862        }
863    }
864}
865
866impl AsyncWrite for DataWriterInner {
867    fn poll_write(
868        mut self: Pin<&mut Self>,
869        cx: &mut Context<'_>,
870        buf: &[u8],
871    ) -> Poll<IoResult<usize>> {
872        if buf.is_empty() {
873            return Poll::Ready(Ok(0));
874        }
875
876        let state = self.state.take().expect("Missing state in DataWriter");
877
878        let mut future = match state {
879            DataWriterState::Ready(mut imp) => {
880                let n_queued = imp.queue_bytes(buf);
881                if n_queued != 0 {
882                    self.state = Some(DataWriterState::Ready(imp));
883                    return Poll::Ready(Ok(n_queued));
884                }
885                // we couldn't queue anything, so the current cell must be full.
886                Box::pin(imp.flush_buf())
887            }
888            DataWriterState::Flushing(fut) => fut,
889            DataWriterState::Closed => {
890                self.state = Some(DataWriterState::Closed);
891                return Poll::Ready(Err(Error::NotConnected.into()));
892            }
893        };
894
895        match future.as_mut().poll(cx) {
896            Poll::Ready((_imp, Err(e))) => {
897                #[cfg(feature = "stream-ctrl")]
898                {
899                    _imp.status.lock().expect("lock poisoned").record_error(&e);
900                }
901                self.state = Some(DataWriterState::Closed);
902                Poll::Ready(Err(e.into()))
903            }
904            Poll::Ready((mut imp, Ok(()))) => {
905                // Great!  We're done flushing.  Queue as much as we can of this
906                // cell.
907                let n_queued = imp.queue_bytes(buf);
908                self.state = Some(DataWriterState::Ready(imp));
909                Poll::Ready(Ok(n_queued))
910            }
911            Poll::Pending => {
912                self.state = Some(DataWriterState::Flushing(future));
913                Poll::Pending
914            }
915        }
916    }
917
918    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
919        self.poll_flush_impl(cx, false)
920    }
921
922    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
923        self.poll_flush_impl(cx, true)
924    }
925}
926
927#[cfg(feature = "tokio")]
928impl TokioAsyncWrite for DataWriterInner {
929    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
930        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
931    }
932
933    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
934        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
935    }
936
937    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
938        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
939    }
940}
941
942impl DataWriterImpl {
943    /// Try to flush the current buffer contents as a data cell.
944    async fn flush_buf(mut self) -> (Self, Result<()>) {
945        let result = if let Some((cell, remainder)) =
946            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
947        {
948            // TODO: Eventually we may want a larger buffer; if we do,
949            // this invariant will become false.
950            assert!(remainder.is_empty());
951            self.n_pending = 0;
952            self.s.send(cell.into()).await
953        } else {
954            Ok(())
955        };
956
957        (self, result)
958    }
959
960    /// Add as many bytes as possible from `b` to our internal buffer;
961    /// return the number we were able to add.
962    fn queue_bytes(&mut self, b: &[u8]) -> usize {
963        let empty_space = &mut self.buf[self.n_pending..];
964        if empty_space.is_empty() {
965            // that is, len == 0
966            return 0;
967        }
968
969        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
970        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
971        self.n_pending += n_to_copy;
972        n_to_copy
973    }
974}
975
976impl DataReaderInner {
977    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
978    /// interact with this stream without holding the stream itself.
979    #[cfg(feature = "stream-ctrl")]
980    pub(crate) fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
981        &self.ctrl
982    }
983}
984
985/// An enumeration for the state of a [`DataReaderInner`].
986// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
987// future. There are a few ways we could simplify this. See:
988// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
989#[derive(Educe)]
990#[educe(Debug)]
991// We allow this since it's expected that streams will spend most of their time in the `Open` state,
992// and will be cleaned up shortly after closing.
993#[allow(clippy::large_enum_variant)]
994enum DataReaderState {
995    /// In this state we have received an end cell or an error.
996    Closed,
997    /// In this state the reader is open.
998    Open(DataReaderImpl),
999}
1000
1001/// Wrapper for the read part of a [`DataStream`].
1002#[derive(Educe)]
1003#[educe(Debug)]
1004#[pin_project]
1005struct DataReaderImpl {
1006    /// The underlying StreamReceiver object.
1007    #[educe(Debug(method = "skip_fmt"))]
1008    #[pin]
1009    s: StreamReceiver,
1010
1011    /// If present, data that we received on this stream but have not
1012    /// been able to send to the caller yet.
1013    // TODO: This data structure is probably not what we want, but
1014    // it's good enough for now.
1015    #[educe(Debug(method = "skip_fmt"))]
1016    pending: Vec<u8>,
1017
1018    /// Index into pending to show what we've already read.
1019    offset: usize,
1020
1021    /// If true, we have received a CONNECTED cell on this stream.
1022    connected: bool,
1023
1024    /// Shared user-visible information about the state of this stream.
1025    #[cfg(feature = "stream-ctrl")]
1026    status: Arc<Mutex<DataStreamStatus>>,
1027}
1028
1029impl AsyncRead for DataReaderInner {
1030    fn poll_read(
1031        mut self: Pin<&mut Self>,
1032        cx: &mut Context<'_>,
1033        buf: &mut [u8],
1034    ) -> Poll<IoResult<usize>> {
1035        // We're pulling the state object out of the reader.  We MUST
1036        // put it back before this function returns.
1037        let mut state = self.state.take().expect("Missing state in DataReaderInner");
1038
1039        loop {
1040            let mut imp = match state {
1041                DataReaderState::Open(mut imp) => {
1042                    // There may be data to read already.
1043                    let n_copied = imp.extract_bytes(buf);
1044                    if n_copied != 0 || buf.is_empty() {
1045                        // We read data into the buffer, or the buffer was 0-len to begin with.
1046                        // Tell the caller.
1047                        self.state = Some(DataReaderState::Open(imp));
1048                        return Poll::Ready(Ok(n_copied));
1049                    }
1050
1051                    // No data available!  We have to try reading.
1052                    imp
1053                }
1054                DataReaderState::Closed => {
1055                    // TODO: Why are we returning an error rather than continuing to return EOF?
1056                    self.state = Some(DataReaderState::Closed);
1057                    return Poll::Ready(Err(Error::NotConnected.into()));
1058                }
1059            };
1060
1061            // See if a cell is ready.
1062            match Pin::new(&mut imp).read_cell(cx) {
1063                Poll::Ready(Err(e)) => {
1064                    // There aren't any survivable errors in the current
1065                    // design.
1066                    self.state = Some(DataReaderState::Closed);
1067                    #[cfg(feature = "stream-ctrl")]
1068                    {
1069                        imp.status.lock().expect("lock poisoned").record_error(&e);
1070                    }
1071                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
1072                        Ok(0)
1073                    } else {
1074                        Err(e.into())
1075                    };
1076                    return Poll::Ready(result);
1077                }
1078                Poll::Ready(Ok(())) => {
1079                    // It read a cell!  Continue the loop.
1080                    state = DataReaderState::Open(imp);
1081                }
1082                Poll::Pending => {
1083                    // No cells ready, so tell the
1084                    // caller to get back to us later.
1085                    self.state = Some(DataReaderState::Open(imp));
1086                    return Poll::Pending;
1087                }
1088            }
1089        }
1090    }
1091}
1092
1093#[cfg(feature = "tokio")]
1094impl TokioAsyncRead for DataReaderInner {
1095    fn poll_read(
1096        self: Pin<&mut Self>,
1097        cx: &mut Context<'_>,
1098        buf: &mut ReadBuf<'_>,
1099    ) -> Poll<IoResult<()>> {
1100        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
1101    }
1102}
1103
1104impl DataReaderImpl {
1105    /// Pull as many bytes as we can off of self.pending, and return that
1106    /// number of bytes.
1107    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
1108        let remainder = &self.pending[self.offset..];
1109        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
1110        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
1111        self.offset += n_to_copy;
1112
1113        n_to_copy
1114    }
1115
1116    /// Return true iff there are no buffered bytes here to yield
1117    fn buf_is_empty(&self) -> bool {
1118        self.pending.len() == self.offset
1119    }
1120
1121    /// Load self.pending with the contents of a new data cell.
1122    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1123        use ClientDataStreamMsg::*;
1124        let msg = match self.as_mut().project().s.poll_next(cx) {
1125            Poll::Pending => return Poll::Pending,
1126            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<ClientDataStreamMsg>() {
1127                Ok(cell) => cell.into_msg(),
1128                Err(e) => {
1129                    self.s.protocol_error();
1130                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
1131                }
1132            },
1133            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
1134            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
1135            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
1136            // terminated, we should be returning `None` here and not considering it as an error.
1137            // The `StreamReceiver` will have already returned an error if the cell stream was
1138            // terminated without an END message.
1139            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
1140        };
1141
1142        let result = match msg {
1143            Connected(_) if !self.connected => {
1144                self.connected = true;
1145                #[cfg(feature = "stream-ctrl")]
1146                {
1147                    self.status
1148                        .lock()
1149                        .expect("poisoned lock")
1150                        .record_connected();
1151                }
1152                Ok(())
1153            }
1154            Connected(_) => {
1155                self.s.protocol_error();
1156                Err(Error::StreamProto(
1157                    "Received a second connect cell on a data stream".to_string(),
1158                ))
1159            }
1160            Data(d) if self.connected => {
1161                self.add_data(d.into());
1162                Ok(())
1163            }
1164            Data(_) => {
1165                self.s.protocol_error();
1166                Err(Error::StreamProto(
1167                    "Received a data cell an unconnected stream".to_string(),
1168                ))
1169            }
1170            End(e) => Err(Error::EndReceived(e.reason())),
1171        };
1172
1173        Poll::Ready(result)
1174    }
1175
1176    /// Add the data from `d` to the end of our pending bytes.
1177    fn add_data(&mut self, mut d: Vec<u8>) {
1178        if self.buf_is_empty() {
1179            // No data pending?  Just take d as the new pending.
1180            self.pending = d;
1181            self.offset = 0;
1182        } else {
1183            // TODO(nickm) This has potential to grow `pending` without bound.
1184            // Fortunately, we don't currently read cells or call this
1185            // `add_data` method when pending is nonempty—but if we do in the
1186            // future, we'll have to be careful here.
1187            self.pending.append(&mut d);
1188        }
1189    }
1190}
1191
1192/// A `CmdChecker` that enforces invariants for outbound data streams.
1193#[derive(Debug)]
1194pub(crate) struct OutboundDataCmdChecker {
1195    /// True if we are expecting to receive a CONNECTED message on this stream.
1196    expecting_connected: bool,
1197}
1198
1199impl Default for OutboundDataCmdChecker {
1200    fn default() -> Self {
1201        Self {
1202            expecting_connected: true,
1203        }
1204    }
1205}
1206
1207impl CmdChecker for OutboundDataCmdChecker {
1208    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
1209        use StreamStatus::*;
1210        match msg.cmd() {
1211            RelayCmd::CONNECTED => {
1212                if !self.expecting_connected {
1213                    Err(Error::StreamProto(
1214                        "Received CONNECTED twice on a stream.".into(),
1215                    ))
1216                } else {
1217                    self.expecting_connected = false;
1218                    Ok(Open)
1219                }
1220            }
1221            RelayCmd::DATA => {
1222                if !self.expecting_connected {
1223                    Ok(Open)
1224                } else {
1225                    Err(Error::StreamProto(
1226                        "Received DATA before CONNECTED on a stream".into(),
1227                    ))
1228                }
1229            }
1230            RelayCmd::END => Ok(Closed),
1231            _ => Err(Error::StreamProto(format!(
1232                "Unexpected {} on a data stream!",
1233                msg.cmd()
1234            ))),
1235        }
1236    }
1237
1238    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1239        let _ = msg
1240            .decode::<ClientDataStreamMsg>()
1241            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1242        Ok(())
1243    }
1244}
1245
1246impl OutboundDataCmdChecker {
1247    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1248    /// constructed connection.
1249    pub(crate) fn new_any() -> AnyCmdChecker {
1250        Box::<Self>::default()
1251    }
1252}