tor_proto/stream/data.rs
1//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::RelayCmd;
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::task::{Context, Poll};
11use futures::Future;
12
13#[cfg(feature = "tokio")]
14use tokio_crate::io::ReadBuf;
15#[cfg(feature = "tokio")]
16use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
17#[cfg(feature = "tokio")]
18use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
19use tor_cell::restricted_msg;
20
21use std::fmt::Debug;
22use std::io::Result as IoResult;
23use std::pin::Pin;
24#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
25use std::sync::Arc;
26#[cfg(feature = "stream-ctrl")]
27use std::sync::{Mutex, Weak};
28
29use educe::Educe;
30
31#[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
32use crate::tunnel::circuit::ClientCirc;
33
34use crate::memquota::StreamAccount;
35use crate::stream::StreamReader;
36use crate::tunnel::StreamTarget;
37use tor_basic_utils::skip_fmt;
38use tor_cell::relaycell::msg::Data;
39use tor_error::internal;
40
41use super::AnyCmdChecker;
42
43/// An anonymized stream over the Tor network.
44///
45/// For most purposes, you can think of this type as an anonymized
46/// TCP stream: it can read and write data, and get closed when it's done.
47///
48/// [`DataStream`] implements [`futures::io::AsyncRead`] and
49/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
50/// traits are expected.
51///
52/// # Examples
53///
54/// Connecting to an HTTP server and sending a request, using
55/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
56///
57/// ```ignore
58/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
59///
60/// use futures::io::AsyncWriteExt;
61///
62/// stream
63/// .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
64/// .await?;
65///
66/// // Flushing the stream is important; see below!
67/// stream.flush().await?;
68/// ```
69///
70/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
71///
72/// ```ignore
73/// use futures::io::AsyncReadExt;
74///
75/// let mut buf = Vec::new();
76/// stream.read_to_end(&mut buf).await?;
77///
78/// println!("{}", String::from_utf8_lossy(&buf));
79/// ```
80///
81/// # Usage with Tokio
82///
83/// If the `tokio` crate feature is enabled, this type also implements
84/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
85/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
86/// with code that expects those traits.
87///
88/// # Remember to call `flush`!
89///
90/// DataStream buffers data internally, in order to write as few cells
91/// as possible onto the network. In order to make sure that your
92/// data has actually been sent, you need to make sure that
93/// [`AsyncWrite::poll_flush`] runs to completion: probably via
94/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
95///
96/// # Splitting the type
97///
98/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
99/// `DataStream::split` method can be used to split it into those two parts, for more
100/// convenient usage with e.g. stream combinators.
101///
102/// # How long does a stream live?
103///
104/// A `DataStream` will live until all references to it are dropped,
105/// or until it is closed explicitly.
106///
107/// If you split the stream into a `DataReader` and a `DataWriter`, it
108/// will survive until _both_ are dropped, or until it is closed
109/// explicitly.
110///
111/// A stream can also close because of a network error,
112/// or because the other side of the stream decided to close it.
113///
114// # Semver note
115//
116// Note that this type is re-exported as a part of the public API of
117// the `arti-client` crate. Any changes to its API here in
118// `tor-proto` need to be reflected above.
119#[derive(Debug)]
120pub struct DataStream {
121 /// Underlying writer for this stream
122 w: DataWriter,
123 /// Underlying reader for this stream
124 r: DataReader,
125 /// A control object that can be used to monitor and control this stream
126 /// without needing to own it.
127 #[cfg(feature = "stream-ctrl")]
128 ctrl: std::sync::Arc<ClientDataStreamCtrl>,
129}
130assert_impl_all! { DataStream: Send, Sync }
131
132/// An object used to control and monitor a data stream.
133///
134/// # Notes
135///
136/// This is a separate type from [`DataStream`] because it's useful to have
137/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
138/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
139/// work correctly.
140#[cfg(feature = "stream-ctrl")]
141#[derive(Debug)]
142pub struct ClientDataStreamCtrl {
143 /// The circuit to which this stream is attached.
144 ///
145 /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
146 /// which in turn has a strong reference to the `ClientCirc`. So as long as any
147 /// one of those is alive, this reference will be present.
148 ///
149 /// We make this a Weak reference so that once the stream itself is closed,
150 /// we can't leak circuits.
151 // TODO(conflux): use ClientTunnel
152 circuit: Weak<ClientCirc>,
153
154 /// Shared user-visible information about the state of this stream.
155 ///
156 /// TODO RPC: This will probably want to be a `postage::Watch` or something
157 /// similar, if and when it stops moving around.
158 #[cfg(feature = "stream-ctrl")]
159 status: Arc<Mutex<DataStreamStatus>>,
160
161 /// The memory quota account that should be used for this stream's data
162 ///
163 /// Exists to keep the account alive
164 _memquota: StreamAccount,
165}
166
167/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
168///
169/// See the [`DataStream`] docs for more information. In particular, note
170/// that this writer requires `poll_flush` to complete in order to guarantee that
171/// all data has been written.
172///
173/// # Usage with Tokio
174///
175/// If the `tokio` crate feature is enabled, this type also implements
176/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
177/// with code that expects that trait.
178///
179/// # Drop and close
180///
181/// Note that dropping a `DataWriter` has no special effect on its own:
182/// if the `DataWriter` is dropped, the underlying stream will still remain open
183/// until the `DataReader` is also dropped.
184///
185/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
186/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
187///
188/// Remember that Tor does not support half-open streams:
189/// If you `close` or `shutdown` a stream,
190/// the other side will not see the stream as half-open,
191/// and so will (probably) not finish sending you any in-progress data.
192/// Do not use `close`/`shutdown` to communicate anything besides
193/// "I am done using this stream."
194///
195// # Semver note
196//
197// Note that this type is re-exported as a part of the public API of
198// the `arti-client` crate. Any changes to its API here in
199// `tor-proto` need to be reflected above.
200#[derive(Debug)]
201pub struct DataWriter {
202 /// Internal state for this writer
203 ///
204 /// This is stored in an Option so that we can mutate it in the
205 /// AsyncWrite functions. It might be possible to do better here,
206 /// and we should refactor if so.
207 state: Option<DataWriterState>,
208
209 /// The memory quota account that should be used for this stream's data
210 ///
211 /// Exists to keep the account alive
212 // If we liked, we could make this conditional; see DataReader.memquota
213 _memquota: StreamAccount,
214
215 /// A control object that can be used to monitor and control this stream
216 /// without needing to own it.
217 #[cfg(feature = "stream-ctrl")]
218 ctrl: std::sync::Arc<ClientDataStreamCtrl>,
219}
220
221/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
222///
223/// See the [`DataStream`] docs for more information.
224///
225/// # Usage with Tokio
226///
227/// If the `tokio` crate feature is enabled, this type also implements
228/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
229/// with code that expects that trait.
230//
231// # Semver note
232//
233// Note that this type is re-exported as a part of the public API of
234// the `arti-client` crate. Any changes to its API here in
235// `tor-proto` need to be reflected above.
236#[derive(Debug)]
237pub struct DataReader {
238 /// Internal state for this reader.
239 ///
240 /// This is stored in an Option so that we can mutate it in
241 /// poll_read(). It might be possible to do better here, and we
242 /// should refactor if so.
243 state: Option<DataReaderState>,
244
245 /// The memory quota account that should be used for this stream's data
246 ///
247 /// Exists to keep the account alive
248 // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
249 // since, ClientDataStreamCtrl contains a StreamAccount clone too. But that seems fragile.
250 _memquota: StreamAccount,
251
252 /// A control object that can be used to monitor and control this stream
253 /// without needing to own it.
254 #[cfg(feature = "stream-ctrl")]
255 ctrl: std::sync::Arc<ClientDataStreamCtrl>,
256}
257
258/// Shared status flags for tracking the status of as `DataStream`.
259///
260/// We expect to refactor this a bit, so it's not exposed at all.
261//
262// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
263// from various points in this module, we should instead construct
264// DataStreamStatus as needed from information available elsewhere. In any
265// case, we should really eliminate as much duplicate state here as we can.
266// (See discussions at !1198 for some challenges with this.)
267#[cfg(feature = "stream-ctrl")]
268#[derive(Clone, Debug, Default)]
269struct DataStreamStatus {
270 /// True if we've received a CONNECTED message.
271 //
272 // TODO: This is redundant with `connected` in DataReaderImpl and
273 // `expecting_connected` in DataCmdChecker.
274 received_connected: bool,
275 /// True if we have decided to send an END message.
276 //
277 // TODO RPC: There is not an easy way to set this from this module! Really,
278 // the decision to send an "end" is made when the StreamTarget object is
279 // dropped, but we don't currently have any way to see when that happens.
280 // Perhaps we need a different shared StreamStatus object that the
281 // StreamTarget holds?
282 sent_end: bool,
283 /// True if we have received an END message telling us to close the stream.
284 received_end: bool,
285 /// True if we have received an error.
286 ///
287 /// (This is not a subset or superset of received_end; some errors are END
288 /// messages but some aren't; some END messages are errors but some aren't.)
289 received_err: bool,
290}
291
292#[cfg(feature = "stream-ctrl")]
293impl DataStreamStatus {
294 /// Remember that we've received a connected message.
295 fn record_connected(&mut self) {
296 self.received_connected = true;
297 }
298
299 /// Remember that we've received an error of some kind.
300 fn record_error(&mut self, e: &Error) {
301 // TODO: Probably we should remember the actual error in a box or
302 // something. But that means making a redundant copy of the error
303 // even if nobody will want it. Do we care?
304 match e {
305 Error::EndReceived(EndReason::DONE) => self.received_end = true,
306 Error::EndReceived(_) => {
307 self.received_end = true;
308 self.received_err = true;
309 }
310 _ => self.received_err = true,
311 }
312 }
313}
314
315restricted_msg! {
316 /// An allowable incoming message on a data stream.
317 enum DataStreamMsg:RelayMsg {
318 // SENDME is handled by the reactor.
319 Data, End, Connected,
320 }
321}
322
323// TODO RPC: Should we also implement this trait for everything that holds a
324// ClientDataStreamCtrl?
325#[cfg(feature = "stream-ctrl")]
326impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
327 // TODO(conflux): use ClientTunnel
328 fn circuit(&self) -> Option<Arc<ClientCirc>> {
329 self.circuit.upgrade()
330 }
331}
332
333#[cfg(feature = "stream-ctrl")]
334impl ClientDataStreamCtrl {
335 /// Return true if the underlying stream is connected. (That is, if it has
336 /// received a `CONNECTED` message, and has not been closed.)
337 pub fn is_connected(&self) -> bool {
338 let s = self.status.lock().expect("poisoned lock");
339 s.received_connected && !(s.sent_end || s.received_end || s.received_err)
340 }
341
342 // TODO RPC: Add more functions once we have the desired API more nailed
343 // down.
344}
345
346impl DataStream {
347 /// Wrap raw stream reader and target parts as a DataStream.
348 ///
349 /// For non-optimistic stream, function `wait_for_connection`
350 /// must be called after to make sure CONNECTED is received.
351 pub(crate) fn new(reader: StreamReader, target: StreamTarget, memquota: StreamAccount) -> Self {
352 Self::new_inner(reader, target, false, memquota)
353 }
354
355 /// Wrap raw stream reader and target parts as a connected DataStream.
356 ///
357 /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
358 /// CONNECTED cell.
359 ///
360 /// This is used by hidden services, exit relays, and directory servers to accept streams.
361 #[cfg(feature = "hs-service")]
362 pub(crate) fn new_connected(
363 reader: StreamReader,
364 target: StreamTarget,
365 memquota: StreamAccount,
366 ) -> Self {
367 Self::new_inner(reader, target, true, memquota)
368 }
369
370 /// The shared implementation of the `new*()` functions.
371 fn new_inner(
372 reader: StreamReader,
373 target: StreamTarget,
374 connected: bool,
375 memquota: StreamAccount,
376 ) -> Self {
377 #[cfg(feature = "stream-ctrl")]
378 let status = {
379 let mut data_stream_status = DataStreamStatus::default();
380 if connected {
381 data_stream_status.record_connected();
382 }
383 Arc::new(Mutex::new(data_stream_status))
384 };
385
386 #[cfg(feature = "stream-ctrl")]
387 let ctrl = Arc::new(ClientDataStreamCtrl {
388 circuit: Arc::downgrade(target.circuit()),
389 status: status.clone(),
390 _memquota: memquota.clone(),
391 });
392 let r = DataReader {
393 state: Some(DataReaderState::Ready(DataReaderImpl {
394 s: reader,
395 pending: Vec::new(),
396 offset: 0,
397 connected,
398 #[cfg(feature = "stream-ctrl")]
399 status: status.clone(),
400 })),
401 _memquota: memquota.clone(),
402 #[cfg(feature = "stream-ctrl")]
403 ctrl: ctrl.clone(),
404 };
405 let w = DataWriter {
406 state: Some(DataWriterState::Ready(DataWriterImpl {
407 s: target,
408 buf: Box::new([0; Data::MAXLEN]),
409 n_pending: 0,
410 #[cfg(feature = "stream-ctrl")]
411 status,
412 })),
413 _memquota: memquota,
414 #[cfg(feature = "stream-ctrl")]
415 ctrl: ctrl.clone(),
416 };
417 DataStream {
418 w,
419 r,
420 #[cfg(feature = "stream-ctrl")]
421 ctrl,
422 }
423 }
424
425 /// Divide this DataStream into its constituent parts.
426 pub fn split(self) -> (DataReader, DataWriter) {
427 (self.r, self.w)
428 }
429
430 /// Wait until a CONNECTED cell is received, or some other cell
431 /// is received to indicate an error.
432 ///
433 /// Does nothing if this stream is already connected.
434 pub async fn wait_for_connection(&mut self) -> Result<()> {
435 // We must put state back before returning
436 let state = self.r.state.take().expect("Missing state in DataReader");
437
438 if let DataReaderState::Ready(imp) = state {
439 let (imp, result) = if imp.connected {
440 (imp, Ok(()))
441 } else {
442 // This succeeds if the cell is CONNECTED, and fails otherwise.
443 imp.read_cell().await
444 };
445 self.r.state = Some(match result {
446 Err(_) => DataReaderState::Closed,
447 Ok(_) => DataReaderState::Ready(imp),
448 });
449 result
450 } else {
451 Err(Error::from(internal!(
452 "Expected ready state, got {:?}",
453 state
454 )))
455 }
456 }
457
458 /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
459 /// interact with this stream without holding the stream itself.
460 #[cfg(feature = "stream-ctrl")]
461 pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
462 Some(&self.ctrl)
463 }
464}
465
466impl AsyncRead for DataStream {
467 fn poll_read(
468 mut self: Pin<&mut Self>,
469 cx: &mut Context<'_>,
470 buf: &mut [u8],
471 ) -> Poll<IoResult<usize>> {
472 AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
473 }
474}
475
476#[cfg(feature = "tokio")]
477impl TokioAsyncRead for DataStream {
478 fn poll_read(
479 self: Pin<&mut Self>,
480 cx: &mut Context<'_>,
481 buf: &mut ReadBuf<'_>,
482 ) -> Poll<IoResult<()>> {
483 TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
484 }
485}
486
487impl AsyncWrite for DataStream {
488 fn poll_write(
489 mut self: Pin<&mut Self>,
490 cx: &mut Context<'_>,
491 buf: &[u8],
492 ) -> Poll<IoResult<usize>> {
493 AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
494 }
495 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
496 AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
497 }
498 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
499 AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
500 }
501}
502
503#[cfg(feature = "tokio")]
504impl TokioAsyncWrite for DataStream {
505 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
506 TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
507 }
508
509 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
510 TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
511 }
512
513 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
514 TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
515 }
516}
517
518/// Helper type: Like BoxFuture, but also requires that the future be Sync.
519type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
520
521/// An enumeration for the state of a DataWriter.
522///
523/// We have to use an enum here because, for as long as we're waiting
524/// for a flush operation to complete, the future returned by
525/// `flush_cell()` owns the DataWriterImpl.
526#[derive(Educe)]
527#[educe(Debug)]
528enum DataWriterState {
529 /// The writer has closed or gotten an error: nothing more to do.
530 Closed,
531 /// The writer is not currently flushing; more data can get queued
532 /// immediately.
533 Ready(DataWriterImpl),
534 /// The writer is flushing a cell.
535 Flushing(
536 #[educe(Debug(method = "skip_fmt"))] //
537 BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
538 ),
539}
540
541/// Internal: the write part of a DataStream
542#[derive(Educe)]
543#[educe(Debug)]
544struct DataWriterImpl {
545 /// The underlying StreamTarget object.
546 s: StreamTarget,
547
548 /// Buffered data to send over the connection.
549 // TODO: this buffer is probably smaller than we want, but it's good
550 // enough for now. If we _do_ make it bigger, we'll have to change
551 // our use of Data::split_from to handle the case where we can't fit
552 // all the data.
553 #[educe(Debug(method = "skip_fmt"))]
554 buf: Box<[u8; Data::MAXLEN]>,
555
556 /// Number of unflushed bytes in buf.
557 n_pending: usize,
558
559 /// Shared user-visible information about the state of this stream.
560 #[cfg(feature = "stream-ctrl")]
561 status: Arc<Mutex<DataStreamStatus>>,
562}
563
564impl DataWriter {
565 /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
566 /// interact with this stream without holding the stream itself.
567 #[cfg(feature = "stream-ctrl")]
568 pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
569 Some(&self.ctrl)
570 }
571
572 /// Helper for poll_flush() and poll_close(): Performs a flush, then
573 /// closes the stream if should_close is true.
574 fn poll_flush_impl(
575 mut self: Pin<&mut Self>,
576 cx: &mut Context<'_>,
577 should_close: bool,
578 ) -> Poll<IoResult<()>> {
579 let state = self.state.take().expect("Missing state in DataWriter");
580
581 // TODO: this whole function is a bit copy-pasted.
582 let mut future: BoxSyncFuture<_> = match state {
583 DataWriterState::Ready(imp) => {
584 if imp.n_pending == 0 {
585 // Nothing to flush!
586 if should_close {
587 // We need to actually continue with this function to do the closing.
588 // Thus, make a future that does nothing and is ready immediately.
589 Box::pin(futures::future::ready((imp, Ok(()))))
590 } else {
591 // There's nothing more to do; we can return.
592 self.state = Some(DataWriterState::Ready(imp));
593 return Poll::Ready(Ok(()));
594 }
595 } else {
596 // We need to flush the buffer's contents; Make a future for that.
597 Box::pin(imp.flush_buf())
598 }
599 }
600 DataWriterState::Flushing(fut) => fut,
601 DataWriterState::Closed => {
602 self.state = Some(DataWriterState::Closed);
603 return Poll::Ready(Err(Error::NotConnected.into()));
604 }
605 };
606
607 match future.as_mut().poll(cx) {
608 Poll::Ready((_imp, Err(e))) => {
609 self.state = Some(DataWriterState::Closed);
610 Poll::Ready(Err(e.into()))
611 }
612 Poll::Ready((mut imp, Ok(()))) => {
613 if should_close {
614 // Tell the StreamTarget to close, so that the reactor
615 // realizes that we are done sending. (Dropping `imp.s` does not
616 // suffice, since there may be other clones of it. In particular,
617 // the StreamReader has one, which it uses to keep the stream
618 // open, among other things.)
619 imp.s.close();
620
621 #[cfg(feature = "stream-ctrl")]
622 {
623 // TODO RPC: This is not sufficient to track every case
624 // where we might have sent an End. See note on the
625 // `sent_end` field.
626 imp.status.lock().expect("lock poisoned").sent_end = true;
627 }
628 self.state = Some(DataWriterState::Closed);
629 } else {
630 self.state = Some(DataWriterState::Ready(imp));
631 }
632 Poll::Ready(Ok(()))
633 }
634 Poll::Pending => {
635 self.state = Some(DataWriterState::Flushing(future));
636 Poll::Pending
637 }
638 }
639 }
640}
641
642impl AsyncWrite for DataWriter {
643 fn poll_write(
644 mut self: Pin<&mut Self>,
645 cx: &mut Context<'_>,
646 buf: &[u8],
647 ) -> Poll<IoResult<usize>> {
648 if buf.is_empty() {
649 return Poll::Ready(Ok(0));
650 }
651
652 let state = self.state.take().expect("Missing state in DataWriter");
653
654 let mut future = match state {
655 DataWriterState::Ready(mut imp) => {
656 let n_queued = imp.queue_bytes(buf);
657 if n_queued != 0 {
658 self.state = Some(DataWriterState::Ready(imp));
659 return Poll::Ready(Ok(n_queued));
660 }
661 // we couldn't queue anything, so the current cell must be full.
662 Box::pin(imp.flush_buf())
663 }
664 DataWriterState::Flushing(fut) => fut,
665 DataWriterState::Closed => {
666 self.state = Some(DataWriterState::Closed);
667 return Poll::Ready(Err(Error::NotConnected.into()));
668 }
669 };
670
671 match future.as_mut().poll(cx) {
672 Poll::Ready((_imp, Err(e))) => {
673 #[cfg(feature = "stream-ctrl")]
674 {
675 _imp.status.lock().expect("lock poisoned").record_error(&e);
676 }
677 self.state = Some(DataWriterState::Closed);
678 Poll::Ready(Err(e.into()))
679 }
680 Poll::Ready((mut imp, Ok(()))) => {
681 // Great! We're done flushing. Queue as much as we can of this
682 // cell.
683 let n_queued = imp.queue_bytes(buf);
684 self.state = Some(DataWriterState::Ready(imp));
685 Poll::Ready(Ok(n_queued))
686 }
687 Poll::Pending => {
688 self.state = Some(DataWriterState::Flushing(future));
689 Poll::Pending
690 }
691 }
692 }
693
694 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
695 self.poll_flush_impl(cx, false)
696 }
697
698 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
699 self.poll_flush_impl(cx, true)
700 }
701}
702
703#[cfg(feature = "tokio")]
704impl TokioAsyncWrite for DataWriter {
705 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
706 TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
707 }
708
709 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
710 TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
711 }
712
713 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
714 TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
715 }
716}
717
718impl DataWriterImpl {
719 /// Try to flush the current buffer contents as a data cell.
720 async fn flush_buf(mut self) -> (Self, Result<()>) {
721 let result =
722 if let Some((cell, remainder)) = Data::try_split_from(&self.buf[..self.n_pending]) {
723 // TODO: Eventually we may want a larger buffer; if we do,
724 // this invariant will become false.
725 assert!(remainder.is_empty());
726 self.n_pending = 0;
727 self.s.send(cell.into()).await
728 } else {
729 Ok(())
730 };
731
732 (self, result)
733 }
734
735 /// Add as many bytes as possible from `b` to our internal buffer;
736 /// return the number we were able to add.
737 fn queue_bytes(&mut self, b: &[u8]) -> usize {
738 let empty_space = &mut self.buf[self.n_pending..];
739 if empty_space.is_empty() {
740 // that is, len == 0
741 return 0;
742 }
743
744 let n_to_copy = std::cmp::min(b.len(), empty_space.len());
745 empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
746 self.n_pending += n_to_copy;
747 n_to_copy
748 }
749}
750
751impl DataReader {
752 /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
753 /// interact with this stream without holding the stream itself.
754 #[cfg(feature = "stream-ctrl")]
755 pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
756 Some(&self.ctrl)
757 }
758}
759
760/// An enumeration for the state of a DataReader.
761///
762/// We have to use an enum here because, when we're waiting for
763/// ReadingCell to complete, the future returned by `read_cell()` owns the
764/// DataCellImpl. If we wanted to store the future and the cell at the
765/// same time, we'd need to make a self-referential structure, which isn't
766/// possible in safe Rust AIUI.
767#[derive(Educe)]
768#[educe(Debug)]
769enum DataReaderState {
770 /// In this state we have received an end cell or an error.
771 Closed,
772 /// In this state the reader is not currently fetching a cell; it
773 /// either has data or not.
774 Ready(DataReaderImpl),
775 /// The reader is currently fetching a cell: this future is the
776 /// progress it is making.
777 ReadingCell(
778 #[educe(Debug(method = "skip_fmt"))] //
779 BoxSyncFuture<'static, (DataReaderImpl, Result<()>)>,
780 ),
781}
782
783/// Wrapper for the read part of a DataStream
784#[derive(Educe)]
785#[educe(Debug)]
786struct DataReaderImpl {
787 /// The underlying StreamReader object.
788 #[educe(Debug(method = "skip_fmt"))]
789 s: StreamReader,
790
791 /// If present, data that we received on this stream but have not
792 /// been able to send to the caller yet.
793 // TODO: This data structure is probably not what we want, but
794 // it's good enough for now.
795 #[educe(Debug(method = "skip_fmt"))]
796 pending: Vec<u8>,
797
798 /// Index into pending to show what we've already read.
799 offset: usize,
800
801 /// If true, we have received a CONNECTED cell on this stream.
802 connected: bool,
803
804 /// Shared user-visible information about the state of this stream.
805 #[cfg(feature = "stream-ctrl")]
806 status: Arc<Mutex<DataStreamStatus>>,
807}
808
809impl AsyncRead for DataReader {
810 fn poll_read(
811 mut self: Pin<&mut Self>,
812 cx: &mut Context<'_>,
813 buf: &mut [u8],
814 ) -> Poll<IoResult<usize>> {
815 // We're pulling the state object out of the reader. We MUST
816 // put it back before this function returns.
817 let mut state = self.state.take().expect("Missing state in DataReader");
818
819 loop {
820 let mut future = match state {
821 DataReaderState::Ready(mut imp) => {
822 // There may be data to read already.
823 let n_copied = imp.extract_bytes(buf);
824 if n_copied != 0 {
825 // We read data into the buffer. Tell the caller.
826 self.state = Some(DataReaderState::Ready(imp));
827 return Poll::Ready(Ok(n_copied));
828 }
829
830 // No data available! We have to launch a read.
831 Box::pin(imp.read_cell())
832 }
833 DataReaderState::ReadingCell(fut) => fut,
834 DataReaderState::Closed => {
835 self.state = Some(DataReaderState::Closed);
836 return Poll::Ready(Err(Error::NotConnected.into()));
837 }
838 };
839
840 // We have a future that represents an in-progress read.
841 // See if it can make progress.
842 match future.as_mut().poll(cx) {
843 Poll::Ready((_imp, Err(e))) => {
844 // There aren't any survivable errors in the current
845 // design.
846 self.state = Some(DataReaderState::Closed);
847 #[cfg(feature = "stream-ctrl")]
848 {
849 _imp.status.lock().expect("lock poisoned").record_error(&e);
850 }
851 let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
852 Ok(0)
853 } else {
854 Err(e.into())
855 };
856 return Poll::Ready(result);
857 }
858 Poll::Ready((imp, Ok(()))) => {
859 // It read a cell! Continue the loop.
860 state = DataReaderState::Ready(imp);
861 }
862 Poll::Pending => {
863 // The future is pending; store it and tell the
864 // caller to get back to us later.
865 self.state = Some(DataReaderState::ReadingCell(future));
866 return Poll::Pending;
867 }
868 }
869 }
870 }
871}
872
873#[cfg(feature = "tokio")]
874impl TokioAsyncRead for DataReader {
875 fn poll_read(
876 self: Pin<&mut Self>,
877 cx: &mut Context<'_>,
878 buf: &mut ReadBuf<'_>,
879 ) -> Poll<IoResult<()>> {
880 TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
881 }
882}
883
884impl DataReaderImpl {
885 /// Pull as many bytes as we can off of self.pending, and return that
886 /// number of bytes.
887 fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
888 let remainder = &self.pending[self.offset..];
889 let n_to_copy = std::cmp::min(buf.len(), remainder.len());
890 buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
891 self.offset += n_to_copy;
892
893 n_to_copy
894 }
895
896 /// Return true iff there are no buffered bytes here to yield
897 fn buf_is_empty(&self) -> bool {
898 self.pending.len() == self.offset
899 }
900
901 /// Load self.pending with the contents of a new data cell.
902 ///
903 /// This function takes ownership of self so that we can avoid
904 /// self-referential lifetimes.
905 async fn read_cell(mut self) -> (Self, Result<()>) {
906 use DataStreamMsg::*;
907 let msg = match self.s.recv().await {
908 Ok(unparsed) => match unparsed.decode::<DataStreamMsg>() {
909 Ok(cell) => cell.into_msg(),
910 Err(e) => {
911 self.s.protocol_error();
912 return (
913 self,
914 Err(Error::from_bytes_err(e, "message on a data stream")),
915 );
916 }
917 },
918 Err(e) => return (self, Err(e)),
919 };
920
921 let result = match msg {
922 Connected(_) if !self.connected => {
923 self.connected = true;
924 #[cfg(feature = "stream-ctrl")]
925 {
926 self.status
927 .lock()
928 .expect("poisoned lock")
929 .record_connected();
930 }
931 Ok(())
932 }
933 Connected(_) => {
934 self.s.protocol_error();
935 Err(Error::StreamProto(
936 "Received a second connect cell on a data stream".to_string(),
937 ))
938 }
939 Data(d) if self.connected => {
940 self.add_data(d.into());
941 Ok(())
942 }
943 Data(_) => {
944 self.s.protocol_error();
945 Err(Error::StreamProto(
946 "Received a data cell an unconnected stream".to_string(),
947 ))
948 }
949 End(e) => Err(Error::EndReceived(e.reason())),
950 };
951
952 (self, result)
953 }
954
955 /// Add the data from `d` to the end of our pending bytes.
956 fn add_data(&mut self, mut d: Vec<u8>) {
957 if self.buf_is_empty() {
958 // No data pending? Just take d as the new pending.
959 self.pending = d;
960 self.offset = 0;
961 } else {
962 // TODO(nickm) This has potential to grow `pending` without bound.
963 // Fortunately, we don't currently read cells or call this
964 // `add_data` method when pending is nonempty—but if we do in the
965 // future, we'll have to be careful here.
966 self.pending.append(&mut d);
967 }
968 }
969}
970
971/// A `CmdChecker` that enforces invariants for outbound data streams.
972#[derive(Debug)]
973pub(crate) struct DataCmdChecker {
974 /// True if we are expecting to receive a CONNECTED message on this stream.
975 expecting_connected: bool,
976}
977
978impl Default for DataCmdChecker {
979 fn default() -> Self {
980 Self {
981 expecting_connected: true,
982 }
983 }
984}
985
986impl super::CmdChecker for DataCmdChecker {
987 fn check_msg(
988 &mut self,
989 msg: &tor_cell::relaycell::UnparsedRelayMsg,
990 ) -> Result<super::StreamStatus> {
991 use super::StreamStatus::*;
992 match msg.cmd() {
993 RelayCmd::CONNECTED => {
994 if !self.expecting_connected {
995 Err(Error::StreamProto(
996 "Received CONNECTED twice on a stream.".into(),
997 ))
998 } else {
999 self.expecting_connected = false;
1000 Ok(Open)
1001 }
1002 }
1003 RelayCmd::DATA => {
1004 if !self.expecting_connected {
1005 Ok(Open)
1006 } else {
1007 Err(Error::StreamProto(
1008 "Received DATA before CONNECTED on a stream".into(),
1009 ))
1010 }
1011 }
1012 RelayCmd::END => Ok(Closed),
1013 _ => Err(Error::StreamProto(format!(
1014 "Unexpected {} on a data stream!",
1015 msg.cmd()
1016 ))),
1017 }
1018 }
1019
1020 fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1021 let _ = msg
1022 .decode::<DataStreamMsg>()
1023 .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1024 Ok(())
1025 }
1026}
1027
1028impl DataCmdChecker {
1029 /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1030 /// constructed connection.
1031 pub(crate) fn new_any() -> AnyCmdChecker {
1032 Box::<Self>::default()
1033 }
1034
1035 /// Return a new boxed `DataCmdChecker` in a state suitable for a
1036 /// connection where an initial CONNECTED cell is not expected.
1037 ///
1038 /// This is used by hidden services, exit relays, and directory servers
1039 /// to accept streams.
1040 #[cfg(feature = "hs-service")]
1041 pub(crate) fn new_connected() -> AnyCmdChecker {
1042 Box::new(Self {
1043 expecting_connected: false,
1044 })
1045 }
1046}