pub mod connection;
pub mod error;
mod frame;
mod pause;
use async_trait::async_trait;
use futures::FutureExt;
use log::{debug, trace};
use std::fmt;
use crate::connection::Connection;
use connection::{control::Control, stream::Stream, Id, Mode};
use error::ConnectionError;
use futures::future::BoxFuture;
use libp2prs_core::identity::Keypair;
use libp2prs_core::muxing::{IReadWrite, IStreamMuxer, ReadWriteEx, StreamInfo, StreamMuxer, StreamMuxerEx};
use libp2prs_core::secure_io::SecureInfo;
use libp2prs_core::transport::{ConnectionInfo, TransportError};
use libp2prs_core::upgrade::{UpgradeInfo, Upgrader};
use libp2prs_core::{Multiaddr, PeerId, PublicKey};
use libp2prs_traits::{SplitEx, SplittableReadWrite};
const DEFAULT_CREDIT: u32 = 256 * 1024;
const MAX_MSG_SIZE: usize = 64 * 1024;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowUpdateMode {
OnReceive,
OnRead,
}
#[derive(Debug, Clone)]
pub struct Config {
receive_window: u32,
max_buffer_size: usize,
max_num_streams: usize,
max_message_size: usize,
window_update_mode: WindowUpdateMode,
read_after_close: bool,
lazy_open: bool,
}
impl Default for Config {
fn default() -> Self {
Config {
receive_window: DEFAULT_CREDIT,
max_buffer_size: 1024 * 1024,
max_num_streams: 8192,
max_message_size: MAX_MSG_SIZE,
window_update_mode: WindowUpdateMode::OnReceive,
read_after_close: true,
lazy_open: false,
}
}
}
impl Config {
pub fn new() -> Self {
Config::default()
}
pub fn set_receive_window(&mut self, n: u32) -> &mut Self {
assert!(n >= DEFAULT_CREDIT);
self.receive_window = n;
self
}
pub fn set_max_buffer_size(&mut self, n: usize) -> &mut Self {
self.max_buffer_size = n;
self
}
pub fn set_max_num_streams(&mut self, n: usize) -> &mut Self {
self.max_num_streams = n;
self
}
pub fn set_max_message_size(&mut self, n: usize) -> &mut Self {
self.max_message_size = n;
self
}
pub fn set_window_update_mode(&mut self, m: WindowUpdateMode) -> &mut Self {
self.window_update_mode = m;
self
}
pub fn set_read_after_close(&mut self, b: bool) -> &mut Self {
self.read_after_close = b;
self
}
pub fn set_lazy_open(&mut self, b: bool) -> &mut Self {
self.lazy_open = b;
self
}
}
pub struct Yamux<C: SplitEx> {
connection: Option<Connection<C>>,
control: Control,
id: Id,
pub la: Multiaddr,
pub ra: Multiaddr,
pub local_priv_key: Keypair,
pub local_peer_id: PeerId,
pub remote_pub_key: PublicKey,
pub remote_peer_id: PeerId,
}
impl<C: SplitEx> Clone for Yamux<C> {
fn clone(&self) -> Self {
Yamux {
connection: None,
control: self.control.clone(),
id: self.id,
la: self.la.clone(),
ra: self.ra.clone(),
local_priv_key: self.local_priv_key.clone(),
local_peer_id: self.local_peer_id,
remote_pub_key: self.remote_pub_key.clone(),
remote_peer_id: self.remote_peer_id,
}
}
}
impl<C: SplitEx> fmt::Debug for Yamux<C> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Yamux")
.field("Id", &self.id)
.field("Ra", &self.ra)
.field("Rid", &self.remote_peer_id)
.finish()
}
}
impl<C: ConnectionInfo + SecureInfo + SplittableReadWrite> Yamux<C> {
pub fn new(io: C, mut cfg: Config, mode: Mode) -> Self {
cfg.set_read_after_close(false);
let local_priv_key = io.local_priv_key();
let local_peer_id = io.local_peer();
let remote_pub_key = io.remote_pub_key();
let remote_peer_id = io.remote_peer();
let la = io.local_multiaddr();
let ra = io.remote_multiaddr();
let conn = Connection::new(io, cfg, mode);
let id = conn.id();
let control = conn.control();
Yamux {
connection: Some(conn),
control,
id,
la,
ra,
local_priv_key,
local_peer_id,
remote_pub_key,
remote_peer_id,
}
}
}
impl<C: SplitEx> SecureInfo for Yamux<C> {
fn local_peer(&self) -> PeerId {
self.local_peer_id
}
fn remote_peer(&self) -> PeerId {
self.remote_peer_id
}
fn local_priv_key(&self) -> Keypair {
self.local_priv_key.clone()
}
fn remote_pub_key(&self) -> PublicKey {
self.remote_pub_key.clone()
}
}
impl<C: SplitEx> ConnectionInfo for Yamux<C> {
fn local_multiaddr(&self) -> Multiaddr {
self.la.clone()
}
fn remote_multiaddr(&self) -> Multiaddr {
self.ra.clone()
}
}
impl StreamInfo for Stream {
fn id(&self) -> usize {
self.id() as usize
}
}
#[async_trait]
impl ReadWriteEx for Stream {
fn box_clone(&self) -> IReadWrite {
Box::new(self.clone())
}
}
impl<C: SplittableReadWrite> StreamMuxerEx for Yamux<C> {}
#[async_trait]
impl<C: SplittableReadWrite> StreamMuxer for Yamux<C> {
async fn open_stream(&mut self) -> Result<IReadWrite, TransportError> {
let s = self.control.open_stream().await?;
trace!("a new outbound substream {:?} opened for yamux... ", s);
Ok(Box::new(s))
}
async fn accept_stream(&mut self) -> Result<IReadWrite, TransportError> {
let s = self.control.accept_stream().await?;
trace!("a new inbound substream {:?} accepted for yamux...", s);
Ok(Box::new(s))
}
async fn close(&mut self) -> Result<(), TransportError> {
self.control.close().await?;
Ok(())
}
fn task(&mut self) -> Option<BoxFuture<'static, ()>> {
if let Some(mut conn) = self.connection.take() {
return Some(
async move {
while conn.next_stream().await.is_ok() {}
debug!("{:?} background-runtime exiting...", conn.id());
}
.boxed(),
);
}
None
}
fn box_clone(&self) -> IStreamMuxer {
Box::new(self.clone())
}
}
impl UpgradeInfo for Config {
type Info = &'static [u8];
fn protocol_info(&self) -> Vec<Self::Info> {
vec![b"/yamux/1.0.0"]
}
}
#[async_trait]
impl<T> Upgrader<T> for Config
where
T: ConnectionInfo + SecureInfo + SplittableReadWrite,
{
type Output = Yamux<T>;
async fn upgrade_inbound(self, socket: T, _info: <Self as UpgradeInfo>::Info) -> Result<Self::Output, TransportError> {
trace!("upgrading yamux inbound");
Ok(Yamux::new(socket, self, Mode::Server))
}
async fn upgrade_outbound(self, socket: T, _info: <Self as UpgradeInfo>::Info) -> Result<Self::Output, TransportError> {
trace!("upgrading yamux outbound");
Ok(Yamux::new(socket, self, Mode::Client))
}
}
impl From<ConnectionError> for TransportError {
fn from(e: ConnectionError) -> Self {
TransportError::StreamMuxerError(Box::new(e))
}
}