1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
// Copyright 2018 Parity Technologies (UK) Ltd. // // Licensed under the Apache License, Version 2.0 or MIT license, at your option. // // A copy of the Apache License, Version 2.0 is included in the software as // LICENSE-APACHE and a copy of the MIT license is included in the software // as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. //! Implementation of [yamux](https://github.com/hashicorp/yamux/blob/master/spec.md), a multiplexer //! over reliable, ordered connections, such as TCP/IP. //! //! The two primary objects, clients of this crate interact with, are `Connection` and //! `StreamHandle`. The former wraps the underlying connection and multiplexes `StreamHandle`s //! which implement `tokio_io::AsyncRead` and `tokio_io::AsyncWrite` over it. //! `Connection` implements `futures::Stream` yielding `StreamHandle`s for inbound connection //! attempts. mod chunks; mod connection; mod error; #[allow(dead_code)] mod frame; mod notify; mod stream; pub use crate::connection::{Connection, Mode, StreamHandle}; pub use crate::error::{DecodeError, ConnectionError}; pub use crate::stream::State; pub(crate) const DEFAULT_CREDIT: u32 = 256 * 1024; // as per yamux specification /// Specifies when window update frames are sent. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum WindowUpdateMode { /// Send window updates as soon as a stream's receive window drops to 0. /// /// This ensures that the sender can resume sending more data as soon as possible /// but a slow reader on the receiving side may be overwhelmed, i.e. it accumulates /// data in its buffer which may reach its limit (see `set_max_buffer_size`). /// In this mode, window updates merely prevent head of line blocking but do not /// effectively exercise back-pressure on senders. OnReceive, /// Send window updates only when data is read on the receiving end. /// /// This ensures that senders do not overwhelm receivers and keeps buffer usage /// low. However, depending on the protocol, there is a risk of deadlock, namely /// if both endpoints want to send data larger than the receivers window and they /// do not read before finishing their writes. Use this mode only if you are sure /// that this will never happen, i.e. if /// /// - Endpoints *A* and *B* never write at the same time, or /// - Endpoints *A* and *B* write at most *n* frames concurrently such that the sum /// of the frame lengths is less or equal to the available credit of *A* and *B* /// respectively. OnRead } /// Yamux configuration. /// /// The default configuration values are as follows: /// /// - receive window = 256 KiB /// - max. buffer size (per stream) = 1 MiB /// - max. number of streams = 8192 /// - window update mode = on receive /// - read after close = true #[derive(Debug, Clone)] pub struct Config { pub(crate) receive_window: u32, pub(crate) max_buffer_size: usize, pub(crate) max_num_streams: usize, pub(crate) window_update_mode: WindowUpdateMode, pub(crate) read_after_close: bool } impl Default for Config { fn default() -> Self { Config { receive_window: DEFAULT_CREDIT, max_buffer_size: 1024 * 1024, max_num_streams: 8192, window_update_mode: WindowUpdateMode::OnReceive, read_after_close: true } } } impl Config { /// Set the receive window (must be >= 256 KiB). pub fn set_receive_window(&mut self, n: u32) -> Result<(), ()> { if n >= DEFAULT_CREDIT { self.receive_window = n; return Ok(()) } Err(()) } /// Set the max. buffer size per stream. pub fn set_max_buffer_size(&mut self, n: usize) { self.max_buffer_size = n } /// Set the max. number of streams. pub fn set_max_num_streams(&mut self, n: usize) { self.max_num_streams = n } /// Set the window update mode to use. pub fn set_window_update_mode(&mut self, m: WindowUpdateMode) { self.window_update_mode = m } /// Allow or disallow streams to read from buffered data after /// the connection has been closed. pub fn set_read_after_close(&mut self, b: bool) { self.read_after_close = b; } }