Skip to main content

tor_proto/client/stream/
data.rs

1//! Declare DataStream, a type that wraps DataReader and DataWriter so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::stream::StreamExt;
11use futures::task::{Context, Poll};
12use futures::{Future, Stream};
13use pin_project::pin_project;
14use postage::watch;
15
16#[cfg(feature = "tokio")]
17use tokio_crate::io::ReadBuf;
18#[cfg(feature = "tokio")]
19use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20#[cfg(feature = "tokio")]
21use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22use tor_cell::restricted_msg;
23
24use std::fmt::Debug;
25use std::io::Result as IoResult;
26use std::num::NonZero;
27use std::pin::Pin;
28#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29use std::sync::Arc;
30#[cfg(feature = "stream-ctrl")]
31use std::sync::{Mutex, Weak};
32
33use educe::Educe;
34
35use crate::client::ClientTunnel;
36use crate::memquota::StreamAccount;
37use crate::stream::StreamReceiver;
38use crate::stream::StreamTarget;
39use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
40use crate::stream::flow_ctrl::state::StreamRateLimit;
41use crate::stream::flow_ctrl::xon_xoff::reader::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
42use tor_async_utils::rate_limited_writer::{
43    DynamicRateLimitedWriter, RateLimitedWriter, RateLimitedWriterConfig,
44};
45use tor_basic_utils::skip_fmt;
46use tor_cell::relaycell::msg::Data;
47use tor_error::internal;
48use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
49
50/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
51///
52/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
53/// [`DataWriter`], which leaves us with this ugly type.
54/// We use a type alias to make `DataWriter` a little nicer.
55type RateConfigStream = futures::stream::Map<
56    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
57    fn(StreamRateLimit) -> RateLimitedWriterConfig,
58>;
59
60/// An anonymized stream over the Tor network.
61///
62/// For most purposes, you can think of this type as an anonymized
63/// TCP stream: it can read and write data, and get closed when it's done.
64///
65/// [`DataStream`] implements [`futures::io::AsyncRead`] and
66/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
67/// traits are expected.
68///
69/// # Examples
70///
71/// Connecting to an HTTP server and sending a request, using
72/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
73///
74/// ```ignore
75/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
76///
77/// use futures::io::AsyncWriteExt;
78///
79/// stream
80///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
81///     .await?;
82///
83/// // Flushing the stream is important; see below!
84/// stream.flush().await?;
85/// ```
86///
87/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
88///
89/// ```ignore
90/// use futures::io::AsyncReadExt;
91///
92/// let mut buf = Vec::new();
93/// stream.read_to_end(&mut buf).await?;
94///
95/// println!("{}", String::from_utf8_lossy(&buf));
96/// ```
97///
98/// # Usage with Tokio
99///
100/// If the `tokio` crate feature is enabled, this type also implements
101/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
102/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
103/// with code that expects those traits.
104///
105/// # Remember to call `flush`!
106///
107/// DataStream buffers data internally, in order to write as few cells
108/// as possible onto the network.  In order to make sure that your
109/// data has actually been sent, you need to make sure that
110/// [`AsyncWrite::poll_flush`] runs to completion: probably via
111/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
112///
113/// # Splitting the type
114///
115/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
116/// `DataStream::split` method can be used to split it into those two parts, for more
117/// convenient usage with e.g. stream combinators.
118///
119/// # How long does a stream live?
120///
121/// A `DataStream` will live until all references to it are dropped,
122/// or until it is closed explicitly.
123///
124/// If you split the stream into a `DataReader` and a `DataWriter`, it
125/// will survive until _both_ are dropped, or until it is closed
126/// explicitly.
127///
128/// A stream can also close because of a network error,
129/// or because the other side of the stream decided to close it.
130///
131// # Semver note
132//
133// Note that this type is re-exported as a part of the public API of
134// the `arti-client` crate.  Any changes to its API here in
135// `tor-proto` need to be reflected above.
136#[derive(Debug)]
137pub struct DataStream {
138    /// Underlying writer for this stream
139    w: DataWriter,
140    /// Underlying reader for this stream
141    r: DataReader,
142    /// A control object that can be used to monitor and control this stream
143    /// without needing to own it.
144    ///
145    /// Set to `None` if this is not a client stream.
146    #[cfg(feature = "stream-ctrl")]
147    ctrl: Option<Arc<ClientDataStreamCtrl>>,
148}
149assert_impl_all! { DataStream: Send, Sync }
150
151/// An object used to control and monitor a data stream.
152///
153/// # Notes
154///
155/// This is a separate type from [`DataStream`] because it's useful to have
156/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
157/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
158/// work correctly.
159#[cfg(feature = "stream-ctrl")]
160#[cfg_attr(
161    feature = "rpc",
162    derive(derive_deftly::Deftly),
163    derive_deftly(tor_rpcbase::templates::Object)
164)]
165#[derive(Debug)]
166pub struct ClientDataStreamCtrl {
167    /// The circuit to which this stream is attached.
168    ///
169    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
170    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
171    /// one of those is alive, this reference will be present.
172    ///
173    /// We make this a Weak reference so that once the stream itself is closed,
174    /// we can't leak circuits.
175    tunnel: Weak<ClientTunnel>,
176
177    /// Shared user-visible information about the state of this stream.
178    ///
179    /// TODO RPC: This will probably want to be a `postage::Watch` or something
180    /// similar, if and when it stops moving around.
181    #[cfg(feature = "stream-ctrl")]
182    status: Arc<Mutex<DataStreamStatus>>,
183
184    /// The memory quota account that should be used for this stream's data
185    ///
186    /// Exists to keep the account alive
187    _memquota: StreamAccount,
188}
189
190/// The inner writer for [`DataWriter`].
191///
192/// This type is responsible for taking bytes and packaging them into cells.
193/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
194#[derive(Debug)]
195struct DataWriterInner {
196    /// Internal state for this writer
197    ///
198    /// This is stored in an Option so that we can mutate it in the
199    /// AsyncWrite functions.  It might be possible to do better here,
200    /// and we should refactor if so.
201    state: Option<DataWriterState>,
202
203    /// The memory quota account that should be used for this stream's data
204    ///
205    /// Exists to keep the account alive
206    // If we liked, we could make this conditional; see DataReaderInner.memquota
207    _memquota: StreamAccount,
208
209    /// A control object that can be used to monitor and control this stream
210    /// without needing to own it.
211    ///
212    /// Set to `None` if this is not a client stream.
213    #[cfg(feature = "stream-ctrl")]
214    ctrl: Option<Arc<ClientDataStreamCtrl>>,
215}
216
217/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
218///
219/// See the [`DataStream`] docs for more information. In particular, note
220/// that this writer requires `poll_flush` to complete in order to guarantee that
221/// all data has been written.
222///
223/// # Usage with Tokio
224///
225/// If the `tokio` crate feature is enabled, this type also implements
226/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
227/// with code that expects that trait.
228///
229/// # Drop and close
230///
231/// Note that dropping a `DataWriter` has no special effect on its own:
232/// if the `DataWriter` is dropped, the underlying stream will still remain open
233/// until the `DataReader` is also dropped.
234///
235/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
236/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
237///
238/// Remember that Tor does not support half-open streams:
239/// If you `close` or `shutdown` a stream,
240/// the other side will not see the stream as half-open,
241/// and so will (probably) not finish sending you any in-progress data.
242/// Do not use `close`/`shutdown` to communicate anything besides
243/// "I am done using this stream."
244///
245// # Semver note
246//
247// Note that this type is re-exported as a part of the public API of
248// the `arti-client` crate.  Any changes to its API here in
249// `tor-proto` need to be reflected above.
250#[derive(Debug)]
251pub struct DataWriter {
252    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
253    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
254}
255
256impl DataWriter {
257    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
258    fn new(
259        inner: DataWriterInner,
260        rate_limit_updates: watch::Receiver<StreamRateLimit>,
261        time_provider: DynTimeProvider,
262    ) -> Self {
263        /// Converts a `rate` into a `RateLimitedWriterConfig`.
264        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
265            let rate = rate.bytes_per_sec();
266            RateLimitedWriterConfig {
267                rate,        // bytes per second
268                burst: rate, // bytes
269                // This number is chosen arbitrarily, but the idea is that we want to balance
270                // between throughput and latency. Assume the user tries to write a large buffer
271                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
272                // frequently and writing a small number of bytes each time to the
273                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
274                // too large (for example 510), we'll be waking up infrequently to write a larger
275                // number of bytes each time. So even if the `DataWriterInner` has almost a full
276                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
277                // bytes before a cell can be sent, it will block until the rate limiter allows 510
278                // more bytes.
279                //
280                // TODO(arti#2028): Is there an optimal value here?
281                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
282            }
283        }
284
285        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
286        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
287
288        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
289        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
290
291        // build the rate limiter
292        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
293        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
294
295        Self { writer }
296    }
297
298    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
299    /// interact with this stream without holding the stream itself.
300    ///
301    /// Returns `None` if this is not a client stream.
302    #[cfg(feature = "stream-ctrl")]
303    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
304        self.writer.inner().client_stream_ctrl()
305    }
306}
307
308impl AsyncWrite for DataWriter {
309    fn poll_write(
310        mut self: Pin<&mut Self>,
311        cx: &mut Context<'_>,
312        buf: &[u8],
313    ) -> Poll<IoResult<usize>> {
314        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
315    }
316
317    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
318        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
319    }
320
321    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
322        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
323    }
324}
325
326#[cfg(feature = "tokio")]
327impl TokioAsyncWrite for DataWriter {
328    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
329        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
330    }
331
332    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
333        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
334    }
335
336    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
337        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
338    }
339}
340
341/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
342///
343/// See the [`DataStream`] docs for more information.
344///
345/// # Usage with Tokio
346///
347/// If the `tokio` crate feature is enabled, this type also implements
348/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
349/// with code that expects that trait.
350//
351// # Semver note
352//
353// Note that this type is re-exported as a part of the public API of
354// the `arti-client` crate.  Any changes to its API here in
355// `tor-proto` need to be reflected above.
356#[derive(Debug)]
357pub struct DataReader {
358    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
359    reader: XonXoffReader<DataReaderInner>,
360}
361
362impl DataReader {
363    /// Create a new [`DataReader`].
364    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
365        Self {
366            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
367        }
368    }
369
370    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
371    /// interact with this stream without holding the stream itself.
372    ///
373    /// Returns `None` if this is not a client stream.
374    #[cfg(feature = "stream-ctrl")]
375    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
376        self.reader.inner().client_stream_ctrl()
377    }
378}
379
380impl AsyncRead for DataReader {
381    fn poll_read(
382        mut self: Pin<&mut Self>,
383        cx: &mut Context<'_>,
384        buf: &mut [u8],
385    ) -> Poll<IoResult<usize>> {
386        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
387    }
388
389    fn poll_read_vectored(
390        mut self: Pin<&mut Self>,
391        cx: &mut Context<'_>,
392        bufs: &mut [std::io::IoSliceMut<'_>],
393    ) -> Poll<IoResult<usize>> {
394        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
395    }
396}
397
398#[cfg(feature = "tokio")]
399impl TokioAsyncRead for DataReader {
400    fn poll_read(
401        self: Pin<&mut Self>,
402        cx: &mut Context<'_>,
403        buf: &mut ReadBuf<'_>,
404    ) -> Poll<IoResult<()>> {
405        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
406    }
407}
408
409/// The inner reader for [`DataReader`].
410///
411/// This type is responsible for taking stream messages and extracting the stream data from them.
412/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
413#[derive(Debug)]
414pub(crate) struct DataReaderInner {
415    /// Internal state for this reader.
416    ///
417    /// This is stored in an Option so that we can mutate it in
418    /// poll_read().  It might be possible to do better here, and we
419    /// should refactor if so.
420    state: Option<DataReaderState>,
421
422    /// The memory quota account that should be used for this stream's data
423    ///
424    /// Exists to keep the account alive
425    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
426    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
427    _memquota: StreamAccount,
428
429    /// A control object that can be used to monitor and control this stream
430    /// without needing to own it.
431    ///
432    /// Set to `None` if this is not a client stream.
433    #[cfg(feature = "stream-ctrl")]
434    ctrl: Option<Arc<ClientDataStreamCtrl>>,
435}
436
437impl BufferIsEmpty for DataReaderInner {
438    /// The result will become stale,
439    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
440    fn is_empty(mut self: Pin<&mut Self>) -> bool {
441        match self
442            .state
443            .as_mut()
444            .expect("forgot to put `DataReaderState` back")
445        {
446            DataReaderState::Open(imp) => {
447                // check if the partial cell in `pending` is empty,
448                // and if the message stream is empty
449                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
450            }
451            // closed, so any data should have been discarded
452            DataReaderState::Closed => true,
453        }
454    }
455}
456
457/// Shared status flags for tracking the status of as `DataStream`.
458///
459/// We expect to refactor this a bit, so it's not exposed at all.
460//
461// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
462// from various points in this module, we should instead construct
463// DataStreamStatus as needed from information available elsewhere.  In any
464// case, we should really  eliminate as much duplicate state here as we can.
465// (See discussions at !1198 for some challenges with this.)
466#[cfg(feature = "stream-ctrl")]
467#[derive(Clone, Debug, Default)]
468struct DataStreamStatus {
469    /// True if we've received a CONNECTED message.
470    //
471    // TODO: This is redundant with `connected` in DataReaderImpl.
472    received_connected: bool,
473    /// True if we have decided to send an END message.
474    //
475    // TODO RPC: There is not an easy way to set this from this module!  Really,
476    // the decision to send an "end" is made when the StreamTarget object is
477    // dropped, but we don't currently have any way to see when that happens.
478    // Perhaps we need a different shared StreamStatus object that the
479    // StreamTarget holds?
480    sent_end: bool,
481    /// True if we have received an END message telling us to close the stream.
482    received_end: bool,
483    /// True if we have received an error.
484    ///
485    /// (This is not a subset or superset of received_end; some errors are END
486    /// messages but some aren't; some END messages are errors but some aren't.)
487    received_err: bool,
488}
489
490#[cfg(feature = "stream-ctrl")]
491impl DataStreamStatus {
492    /// Remember that we've received a connected message.
493    fn record_connected(&mut self) {
494        self.received_connected = true;
495    }
496
497    /// Remember that we've received an error of some kind.
498    fn record_error(&mut self, e: &Error) {
499        // TODO: Probably we should remember the actual error in a box or
500        // something.  But that means making a redundant copy of the error
501        // even if nobody will want it.  Do we care?
502        match e {
503            Error::EndReceived(EndReason::DONE) => self.received_end = true,
504            Error::EndReceived(_) => {
505                self.received_end = true;
506                self.received_err = true;
507            }
508            _ => self.received_err = true,
509        }
510    }
511}
512
513restricted_msg! {
514    /// An allowable incoming message on a client data stream.
515    enum ClientDataStreamMsg:RelayMsg {
516        // SENDME is handled by the reactor.
517        Data, End, Connected,
518    }
519}
520
521// TODO RPC: Should we also implement this trait for everything that holds a
522// ClientDataStreamCtrl?
523#[cfg(feature = "stream-ctrl")]
524impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
525    fn tunnel(&self) -> Option<Arc<ClientTunnel>> {
526        self.tunnel.upgrade()
527    }
528}
529
530#[cfg(feature = "stream-ctrl")]
531impl ClientDataStreamCtrl {
532    /// Return true if the underlying stream is connected. (That is, if it has
533    /// received a `CONNECTED` message, and has not been closed.)
534    pub fn is_connected(&self) -> bool {
535        let s = self.status.lock().expect("poisoned lock");
536        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
537    }
538
539    // TODO RPC: Add more functions once we have the desired API more nailed
540    // down.
541}
542
543impl DataStream {
544    /// Wrap raw stream receiver and target parts as a DataStream.
545    ///
546    /// For non-optimistic stream, function `wait_for_connection`
547    /// must be called after to make sure CONNECTED is received.
548    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
549        time_provider: P,
550        receiver: StreamReceiver,
551        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
552        target: StreamTarget,
553        memquota: StreamAccount,
554    ) -> Self {
555        Self::new_inner(
556            time_provider,
557            receiver,
558            xon_xoff_reader_ctrl,
559            target,
560            false,
561            memquota,
562        )
563    }
564
565    /// Wrap raw stream receiver and target parts as a connected DataStream.
566    ///
567    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
568    /// CONNECTED cell.
569    ///
570    /// This is used by hidden services, exit relays, and directory servers to accept streams.
571    #[cfg(any(feature = "hs-service", feature = "relay"))]
572    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
573        time_provider: P,
574        receiver: StreamReceiver,
575        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
576        target: StreamTarget,
577        memquota: StreamAccount,
578    ) -> Self {
579        Self::new_inner(
580            time_provider,
581            receiver,
582            xon_xoff_reader_ctrl,
583            target,
584            true,
585            memquota,
586        )
587    }
588
589    /// The shared implementation of the `new*()` functions.
590    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
591        time_provider: P,
592        receiver: StreamReceiver,
593        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
594        target: StreamTarget,
595        connected: bool,
596        memquota: StreamAccount,
597    ) -> Self {
598        let relay_cell_format = target.relay_cell_format();
599        let out_buf_len = Data::max_body_len(relay_cell_format);
600        let rate_limit_stream = target.rate_limit_stream().clone();
601
602        #[cfg(feature = "stream-ctrl")]
603        let status = {
604            let mut data_stream_status = DataStreamStatus::default();
605            if connected {
606                data_stream_status.record_connected();
607            }
608            Arc::new(Mutex::new(data_stream_status))
609        };
610
611        #[cfg(feature = "stream-ctrl")]
612        let ctrl = {
613            let tunnel = match target.tunnel() {
614                crate::stream::Tunnel::Client(t) => Some(Arc::downgrade(t)),
615                #[cfg(feature = "relay")]
616                crate::stream::Tunnel::Relay(_) => None,
617            };
618
619            tunnel.map(|tunnel| {
620                Arc::new(ClientDataStreamCtrl {
621                    tunnel,
622                    status: status.clone(),
623                    _memquota: memquota.clone(),
624                })
625            })
626        };
627        let r = DataReaderInner {
628            state: Some(DataReaderState::Open(DataReaderImpl {
629                s: receiver,
630                pending: Vec::new(),
631                offset: 0,
632                connected,
633                #[cfg(feature = "stream-ctrl")]
634                status: status.clone(),
635            })),
636            _memquota: memquota.clone(),
637            #[cfg(feature = "stream-ctrl")]
638            ctrl: ctrl.clone(),
639        };
640        let w = DataWriterInner {
641            state: Some(DataWriterState::Ready(DataWriterImpl {
642                s: target,
643                buf: vec![0; out_buf_len].into_boxed_slice(),
644                n_pending: 0,
645                #[cfg(feature = "stream-ctrl")]
646                status,
647                relay_cell_format,
648            })),
649            _memquota: memquota,
650            #[cfg(feature = "stream-ctrl")]
651            ctrl: ctrl.clone(),
652        };
653
654        let time_provider = DynTimeProvider::new(time_provider);
655
656        DataStream {
657            w: DataWriter::new(w, rate_limit_stream, time_provider),
658            r: DataReader::new(r, xon_xoff_reader_ctrl),
659            #[cfg(feature = "stream-ctrl")]
660            ctrl,
661        }
662    }
663
664    /// Divide this DataStream into its constituent parts.
665    pub fn split(self) -> (DataReader, DataWriter) {
666        (self.r, self.w)
667    }
668
669    /// Wait until a CONNECTED cell is received, or some other cell
670    /// is received to indicate an error.
671    ///
672    /// Does nothing if this stream is already connected.
673    pub async fn wait_for_connection(&mut self) -> Result<()> {
674        // We must put state back before returning
675        let state = self
676            .r
677            .reader
678            .inner_mut()
679            .state
680            .take()
681            .expect("Missing state in DataReaderInner");
682
683        if let DataReaderState::Open(mut imp) = state {
684            let result = if imp.connected {
685                Ok(())
686            } else {
687                // This succeeds if the cell is CONNECTED, and fails otherwise.
688                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
689            };
690            self.r.reader.inner_mut().state = Some(match result {
691                Err(_) => DataReaderState::Closed,
692                Ok(_) => DataReaderState::Open(imp),
693            });
694            result
695        } else {
696            Err(Error::from(internal!(
697                "Expected ready state, got {:?}",
698                state
699            )))
700        }
701    }
702
703    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
704    /// interact with this stream without holding the stream itself.
705    #[cfg(feature = "stream-ctrl")]
706    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
707        self.ctrl.as_ref()
708    }
709}
710
711impl AsyncRead for DataStream {
712    fn poll_read(
713        mut self: Pin<&mut Self>,
714        cx: &mut Context<'_>,
715        buf: &mut [u8],
716    ) -> Poll<IoResult<usize>> {
717        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
718    }
719}
720
721#[cfg(feature = "tokio")]
722impl TokioAsyncRead for DataStream {
723    fn poll_read(
724        self: Pin<&mut Self>,
725        cx: &mut Context<'_>,
726        buf: &mut ReadBuf<'_>,
727    ) -> Poll<IoResult<()>> {
728        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
729    }
730}
731
732impl AsyncWrite for DataStream {
733    fn poll_write(
734        mut self: Pin<&mut Self>,
735        cx: &mut Context<'_>,
736        buf: &[u8],
737    ) -> Poll<IoResult<usize>> {
738        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
739    }
740    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
741        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
742    }
743    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
744        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
745    }
746}
747
748#[cfg(feature = "tokio")]
749impl TokioAsyncWrite for DataStream {
750    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
751        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
752    }
753
754    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
755        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
756    }
757
758    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
759        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
760    }
761}
762
763/// Helper type: Like BoxFuture, but also requires that the future be Sync.
764type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
765
766/// An enumeration for the state of a DataWriter.
767///
768/// We have to use an enum here because, for as long as we're waiting
769/// for a flush operation to complete, the future returned by
770/// `flush_cell()` owns the DataWriterImpl.
771#[derive(Educe)]
772#[educe(Debug)]
773enum DataWriterState {
774    /// The writer has closed or gotten an error: nothing more to do.
775    Closed,
776    /// The writer is not currently flushing; more data can get queued
777    /// immediately.
778    Ready(DataWriterImpl),
779    /// The writer is flushing a cell.
780    Flushing(
781        #[educe(Debug(method = "skip_fmt"))] //
782        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
783    ),
784}
785
786/// Internal: the write part of a DataStream
787#[derive(Educe)]
788#[educe(Debug)]
789struct DataWriterImpl {
790    /// The underlying StreamTarget object.
791    s: StreamTarget,
792
793    /// Buffered data to send over the connection.
794    ///
795    /// This buffer is currently allocated using a number of bytes
796    /// equal to the maximum that we can package at a time.
797    //
798    // TODO: this buffer is probably smaller than we want, but it's good
799    // enough for now.  If we _do_ make it bigger, we'll have to change
800    // our use of Data::split_from to handle the case where we can't fit
801    // all the data.
802    #[educe(Debug(method = "skip_fmt"))]
803    buf: Box<[u8]>,
804
805    /// Number of unflushed bytes in buf.
806    n_pending: usize,
807
808    /// Relay cell format in use
809    relay_cell_format: RelayCellFormat,
810
811    /// Shared user-visible information about the state of this stream.
812    #[cfg(feature = "stream-ctrl")]
813    status: Arc<Mutex<DataStreamStatus>>,
814}
815
816impl DataWriterInner {
817    /// See [`DataWriter::client_stream_ctrl`].
818    #[cfg(feature = "stream-ctrl")]
819    fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
820        self.ctrl.as_ref()
821    }
822
823    /// Helper for poll_flush() and poll_close(): Performs a flush, then
824    /// closes the stream if should_close is true.
825    fn poll_flush_impl(
826        mut self: Pin<&mut Self>,
827        cx: &mut Context<'_>,
828        should_close: bool,
829    ) -> Poll<IoResult<()>> {
830        let state = self.state.take().expect("Missing state in DataWriter");
831
832        // TODO: this whole function is a bit copy-pasted.
833        let mut future: BoxSyncFuture<_> = match state {
834            DataWriterState::Ready(imp) => {
835                if imp.n_pending == 0 {
836                    // Nothing to flush!
837                    if should_close {
838                        // We need to actually continue with this function to do the closing.
839                        // Thus, make a future that does nothing and is ready immediately.
840                        Box::pin(futures::future::ready((imp, Ok(()))))
841                    } else {
842                        // There's nothing more to do; we can return.
843                        self.state = Some(DataWriterState::Ready(imp));
844                        return Poll::Ready(Ok(()));
845                    }
846                } else {
847                    // We need to flush the buffer's contents; Make a future for that.
848                    Box::pin(imp.flush_buf())
849                }
850            }
851            DataWriterState::Flushing(fut) => fut,
852            DataWriterState::Closed => {
853                self.state = Some(DataWriterState::Closed);
854                return Poll::Ready(Err(Error::NotConnected.into()));
855            }
856        };
857
858        match future.as_mut().poll(cx) {
859            Poll::Ready((_imp, Err(e))) => {
860                self.state = Some(DataWriterState::Closed);
861                Poll::Ready(Err(e.into()))
862            }
863            Poll::Ready((mut imp, Ok(()))) => {
864                if should_close {
865                    // Tell the StreamTarget to close, so that the reactor
866                    // realizes that we are done sending. (Dropping `imp.s` does not
867                    // suffice, since there may be other clones of it.  In particular,
868                    // the StreamReceiver has one, which it uses to keep the stream
869                    // open, among other things.)
870                    imp.s.close();
871
872                    #[cfg(feature = "stream-ctrl")]
873                    {
874                        // TODO RPC:  This is not sufficient to track every case
875                        // where we might have sent an End.  See note on the
876                        // `sent_end` field.
877                        imp.status.lock().expect("lock poisoned").sent_end = true;
878                    }
879                    self.state = Some(DataWriterState::Closed);
880                } else {
881                    self.state = Some(DataWriterState::Ready(imp));
882                }
883                Poll::Ready(Ok(()))
884            }
885            Poll::Pending => {
886                self.state = Some(DataWriterState::Flushing(future));
887                Poll::Pending
888            }
889        }
890    }
891}
892
893impl AsyncWrite for DataWriterInner {
894    fn poll_write(
895        mut self: Pin<&mut Self>,
896        cx: &mut Context<'_>,
897        buf: &[u8],
898    ) -> Poll<IoResult<usize>> {
899        if buf.is_empty() {
900            return Poll::Ready(Ok(0));
901        }
902
903        let state = self.state.take().expect("Missing state in DataWriter");
904
905        let mut future = match state {
906            DataWriterState::Ready(mut imp) => {
907                let n_queued = imp.queue_bytes(buf);
908                if n_queued != 0 {
909                    self.state = Some(DataWriterState::Ready(imp));
910                    return Poll::Ready(Ok(n_queued));
911                }
912                // we couldn't queue anything, so the current cell must be full.
913                Box::pin(imp.flush_buf())
914            }
915            DataWriterState::Flushing(fut) => fut,
916            DataWriterState::Closed => {
917                self.state = Some(DataWriterState::Closed);
918                return Poll::Ready(Err(Error::NotConnected.into()));
919            }
920        };
921
922        match future.as_mut().poll(cx) {
923            Poll::Ready((_imp, Err(e))) => {
924                #[cfg(feature = "stream-ctrl")]
925                {
926                    _imp.status.lock().expect("lock poisoned").record_error(&e);
927                }
928                self.state = Some(DataWriterState::Closed);
929                Poll::Ready(Err(e.into()))
930            }
931            Poll::Ready((mut imp, Ok(()))) => {
932                // Great!  We're done flushing.  Queue as much as we can of this
933                // cell.
934                let n_queued = imp.queue_bytes(buf);
935                self.state = Some(DataWriterState::Ready(imp));
936                Poll::Ready(Ok(n_queued))
937            }
938            Poll::Pending => {
939                self.state = Some(DataWriterState::Flushing(future));
940                Poll::Pending
941            }
942        }
943    }
944
945    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
946        self.poll_flush_impl(cx, false)
947    }
948
949    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
950        self.poll_flush_impl(cx, true)
951    }
952}
953
954#[cfg(feature = "tokio")]
955impl TokioAsyncWrite for DataWriterInner {
956    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
957        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
958    }
959
960    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
961        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
962    }
963
964    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
965        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
966    }
967}
968
969impl DataWriterImpl {
970    /// Try to flush the current buffer contents as a data cell.
971    async fn flush_buf(mut self) -> (Self, Result<()>) {
972        let result = if let Some((cell, remainder)) =
973            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
974        {
975            // TODO: Eventually we may want a larger buffer; if we do,
976            // this invariant will become false.
977            assert!(remainder.is_empty());
978            self.n_pending = 0;
979            self.s.send(cell.into()).await
980        } else {
981            Ok(())
982        };
983
984        (self, result)
985    }
986
987    /// Add as many bytes as possible from `b` to our internal buffer;
988    /// return the number we were able to add.
989    fn queue_bytes(&mut self, b: &[u8]) -> usize {
990        let empty_space = &mut self.buf[self.n_pending..];
991        if empty_space.is_empty() {
992            // that is, len == 0
993            return 0;
994        }
995
996        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
997        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
998        self.n_pending += n_to_copy;
999        n_to_copy
1000    }
1001}
1002
1003impl DataReaderInner {
1004    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
1005    /// interact with this stream without holding the stream itself.
1006    #[cfg(feature = "stream-ctrl")]
1007    pub(crate) fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
1008        self.ctrl.as_ref()
1009    }
1010}
1011
1012/// An enumeration for the state of a [`DataReaderInner`].
1013// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
1014// future. There are a few ways we could simplify this. See:
1015// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
1016#[derive(Educe)]
1017#[educe(Debug)]
1018// We allow this since it's expected that streams will spend most of their time in the `Open` state,
1019// and will be cleaned up shortly after closing.
1020#[allow(clippy::large_enum_variant)]
1021enum DataReaderState {
1022    /// In this state we have received an end cell or an error.
1023    Closed,
1024    /// In this state the reader is open.
1025    Open(DataReaderImpl),
1026}
1027
1028/// Wrapper for the read part of a [`DataStream`].
1029#[derive(Educe)]
1030#[educe(Debug)]
1031#[pin_project]
1032struct DataReaderImpl {
1033    /// The underlying StreamReceiver object.
1034    #[educe(Debug(method = "skip_fmt"))]
1035    #[pin]
1036    s: StreamReceiver,
1037
1038    /// If present, data that we received on this stream but have not
1039    /// been able to send to the caller yet.
1040    // TODO: This data structure is probably not what we want, but
1041    // it's good enough for now.
1042    #[educe(Debug(method = "skip_fmt"))]
1043    pending: Vec<u8>,
1044
1045    /// Index into pending to show what we've already read.
1046    offset: usize,
1047
1048    /// If true, we have received a CONNECTED cell on this stream.
1049    connected: bool,
1050
1051    /// Shared user-visible information about the state of this stream.
1052    #[cfg(feature = "stream-ctrl")]
1053    status: Arc<Mutex<DataStreamStatus>>,
1054}
1055
1056impl AsyncRead for DataReaderInner {
1057    fn poll_read(
1058        mut self: Pin<&mut Self>,
1059        cx: &mut Context<'_>,
1060        buf: &mut [u8],
1061    ) -> Poll<IoResult<usize>> {
1062        // We're pulling the state object out of the reader.  We MUST
1063        // put it back before this function returns.
1064        let mut state = self.state.take().expect("Missing state in DataReaderInner");
1065
1066        loop {
1067            let mut imp = match state {
1068                DataReaderState::Open(mut imp) => {
1069                    // There may be data to read already.
1070                    let n_copied = imp.extract_bytes(buf);
1071                    if n_copied != 0 || buf.is_empty() {
1072                        // We read data into the buffer, or the buffer was 0-len to begin with.
1073                        // Tell the caller.
1074                        self.state = Some(DataReaderState::Open(imp));
1075                        return Poll::Ready(Ok(n_copied));
1076                    }
1077
1078                    // No data available!  We have to try reading.
1079                    imp
1080                }
1081                DataReaderState::Closed => {
1082                    // TODO: Why are we returning an error rather than continuing to return EOF?
1083                    self.state = Some(DataReaderState::Closed);
1084                    return Poll::Ready(Err(Error::NotConnected.into()));
1085                }
1086            };
1087
1088            // See if a cell is ready.
1089            match Pin::new(&mut imp).read_cell(cx) {
1090                Poll::Ready(Err(e)) => {
1091                    // There aren't any survivable errors in the current
1092                    // design.
1093                    self.state = Some(DataReaderState::Closed);
1094                    #[cfg(feature = "stream-ctrl")]
1095                    {
1096                        imp.status.lock().expect("lock poisoned").record_error(&e);
1097                    }
1098                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
1099                        Ok(0)
1100                    } else {
1101                        Err(e.into())
1102                    };
1103                    return Poll::Ready(result);
1104                }
1105                Poll::Ready(Ok(())) => {
1106                    // It read a cell!  Continue the loop.
1107                    state = DataReaderState::Open(imp);
1108                }
1109                Poll::Pending => {
1110                    // No cells ready, so tell the
1111                    // caller to get back to us later.
1112                    self.state = Some(DataReaderState::Open(imp));
1113                    return Poll::Pending;
1114                }
1115            }
1116        }
1117    }
1118}
1119
1120#[cfg(feature = "tokio")]
1121impl TokioAsyncRead for DataReaderInner {
1122    fn poll_read(
1123        self: Pin<&mut Self>,
1124        cx: &mut Context<'_>,
1125        buf: &mut ReadBuf<'_>,
1126    ) -> Poll<IoResult<()>> {
1127        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
1128    }
1129}
1130
1131impl DataReaderImpl {
1132    /// Pull as many bytes as we can off of self.pending, and return that
1133    /// number of bytes.
1134    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
1135        let remainder = &self.pending[self.offset..];
1136        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
1137        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
1138        self.offset += n_to_copy;
1139
1140        n_to_copy
1141    }
1142
1143    /// Return true iff there are no buffered bytes here to yield
1144    fn buf_is_empty(&self) -> bool {
1145        self.pending.len() == self.offset
1146    }
1147
1148    /// Load self.pending with the contents of a new data cell.
1149    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1150        use ClientDataStreamMsg::*;
1151        let msg = match self.as_mut().project().s.poll_next(cx) {
1152            Poll::Pending => return Poll::Pending,
1153            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<ClientDataStreamMsg>() {
1154                Ok(cell) => cell.into_msg(),
1155                Err(e) => {
1156                    self.s.protocol_error();
1157                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
1158                }
1159            },
1160            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
1161            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
1162            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
1163            // terminated, we should be returning `None` here and not considering it as an error.
1164            // The `StreamReceiver` will have already returned an error if the cell stream was
1165            // terminated without an END message.
1166            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
1167        };
1168
1169        let result = match msg {
1170            Connected(_) if !self.connected => {
1171                self.connected = true;
1172                #[cfg(feature = "stream-ctrl")]
1173                {
1174                    self.status
1175                        .lock()
1176                        .expect("poisoned lock")
1177                        .record_connected();
1178                }
1179                Ok(())
1180            }
1181            Connected(_) => {
1182                self.s.protocol_error();
1183                Err(Error::StreamProto(
1184                    "Received a second connect cell on a data stream".to_string(),
1185                ))
1186            }
1187            Data(d) if self.connected => {
1188                self.add_data(d.into());
1189                Ok(())
1190            }
1191            Data(_) => {
1192                self.s.protocol_error();
1193                Err(Error::StreamProto(
1194                    "Received a data cell an unconnected stream".to_string(),
1195                ))
1196            }
1197            End(e) => Err(Error::EndReceived(e.reason())),
1198        };
1199
1200        Poll::Ready(result)
1201    }
1202
1203    /// Add the data from `d` to the end of our pending bytes.
1204    fn add_data(&mut self, mut d: Vec<u8>) {
1205        if self.buf_is_empty() {
1206            // No data pending?  Just take d as the new pending.
1207            self.pending = d;
1208            self.offset = 0;
1209        } else {
1210            // TODO(nickm) This has potential to grow `pending` without bound.
1211            // Fortunately, we don't currently read cells or call this
1212            // `add_data` method when pending is nonempty—but if we do in the
1213            // future, we'll have to be careful here.
1214            self.pending.append(&mut d);
1215        }
1216    }
1217}
1218
1219/// A `CmdChecker` that enforces invariants for outbound data streams.
1220#[derive(Debug)]
1221pub(crate) struct OutboundDataCmdChecker {
1222    /// True if we are expecting to receive a CONNECTED message on this stream.
1223    expecting_connected: bool,
1224}
1225
1226impl Default for OutboundDataCmdChecker {
1227    fn default() -> Self {
1228        Self {
1229            expecting_connected: true,
1230        }
1231    }
1232}
1233
1234impl CmdChecker for OutboundDataCmdChecker {
1235    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
1236        use StreamStatus::*;
1237        match msg.cmd() {
1238            RelayCmd::CONNECTED => {
1239                if !self.expecting_connected {
1240                    Err(Error::StreamProto(
1241                        "Received CONNECTED twice on a stream.".into(),
1242                    ))
1243                } else {
1244                    self.expecting_connected = false;
1245                    Ok(Open)
1246                }
1247            }
1248            RelayCmd::DATA => {
1249                if !self.expecting_connected {
1250                    Ok(Open)
1251                } else {
1252                    Err(Error::StreamProto(
1253                        "Received DATA before CONNECTED on a stream".into(),
1254                    ))
1255                }
1256            }
1257            RelayCmd::END => Ok(Closed),
1258            _ => Err(Error::StreamProto(format!(
1259                "Unexpected {} on a data stream!",
1260                msg.cmd()
1261            ))),
1262        }
1263    }
1264
1265    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1266        let _ = msg
1267            .decode::<ClientDataStreamMsg>()
1268            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1269        Ok(())
1270    }
1271}
1272
1273impl OutboundDataCmdChecker {
1274    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1275    /// constructed connection.
1276    pub(crate) fn new_any() -> AnyCmdChecker {
1277        Box::<Self>::default()
1278    }
1279}