use std::{any::TypeId, fmt::Debug, pin::Pin, task::Poll};
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::{tcp::OwnedWriteHalf, TcpStream},
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
};
use url::Url;
use super::{
listener::{ConnectInfo, Local, Peer, WireListenerEvent},
unwire::{Unwire, Unwiring},
wired::{HandleEvent, WiredHandle, WiredServer},
ConnectConfig, IoSplit, SplitStream,
};
type WireId = u64;
#[derive(Debug, Clone)]
pub struct WireInfo {
wire_id: WireId,
access_key: u128,
connect_info: ConnectInfo,
}
impl WireInfo {
pub(crate) fn new(wire_id: WireId, access_key: u128, connect_info: ConnectInfo) -> Self {
Self {
wire_id,
access_key,
connect_info,
}
}
pub fn wire_id(&self) -> WireId {
self.wire_id
}
pub fn access_key(&self) -> u128 {
self.access_key
}
}
impl Unwiring for WireInfo {
fn unwiring<W: Unwire>(wire: &mut W) -> impl std::future::Future<Output = Result<Self, std::io::Error>> + Send {
async move {
Ok(Self {
wire_id: wire.unwiring().await?,
access_key: wire.unwiring().await?,
connect_info: wire.unwiring().await?,
})
}
}
}
pub trait Wire: AsyncWrite + Unpin + Send + 'static + Sync + Sized {
type Stream: SplitStream;
fn stream(&mut self) -> impl std::future::Future<Output = Result<Self::Stream, std::io::Error>> + Send;
fn wire<T: Wiring>(&mut self, t: T) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move {
t.wiring(self).await?;
self.flush().await?;
Ok(())
}
}
fn wiring<T: Wiring>(&mut self, item: T) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
item.wiring(self)
}
}
impl<T: AsyncWrite + Send + AsyncRead + 'static + Sync + Unpin + Debug> Wire for tokio::io::WriteHalf<T> {
type Stream = IoSplit<T>;
fn stream(&mut self) -> impl std::future::Future<Output = Result<Self::Stream, std::io::Error>> + Send {
async {
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Cannot to establish stream from WriteHalf",
))
}
}
}
impl Wire for OwnedWriteHalf {
type Stream = TcpStream;
fn stream(&mut self) -> impl std::future::Future<Output = Result<TcpStream, std::io::Error>> + Send {
async {
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"TcpStream from OwnedWriteHalf is not supported",
))
}
}
}
impl Wire for TcpStream {
type Stream = Self;
fn stream(&mut self) -> impl std::future::Future<Output = Result<TcpStream, std::io::Error>> + Send {
async {
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"TcpStream from stream is not supported",
))
}
}
}
#[derive(Debug)]
struct ConsumeWire<T>(Option<T>);
impl<T: SplitStream> Unwire for ConsumeWire<T> {
type Stream = T;
fn stream(&mut self) -> impl std::future::Future<Output = Result<Self::Stream, std::io::Error>> + Send {
async move {
if let Some(wire) = Option::take(&mut self.0) {
return Ok(wire);
}
Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Unable to consume a wire",
))
}
}
}
impl<T: AsyncRead + Unpin> AsyncRead for ConsumeWire<T> {
fn poll_read(
self: Pin<&mut Self>,
_: &mut std::task::Context<'_>,
_: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Unable to poll_read from a consumed wire",
)))
}
}
impl<T: AsyncWrite + Send + Sync + Unpin + 'static, C: ConnectConfig> Wire for WireStream<T, C>
where
C::Stream: SplitStream,
{
type Stream = WireStream<C::Stream, C>;
fn stream(&mut self) -> impl std::future::Future<Output = Result<Self::Stream, std::io::Error>> + Send {
async {
let peer = self.peer.as_ref().ok_or(std::io::Error::new(
std::io::ErrorKind::AddrNotAvailable,
"Wire doesn't have peer connect info",
))?;
let connect_info = &peer.wire_info.connect_info;
let stream: <C as ConnectConfig>::RawStream = peer.connect_config.connect_stream(connect_info).await?;
let stream = peer.connect_config.enhance_stream(stream)?;
if let Some(local_handle) = &peer.local_handle {
let (reply, rx) = oneshot::channel();
local_handle
.send(WireListenerEvent::OutgoingWire {
stream,
forward_info: Some((peer.wire_info.wire_id(), peer.wire_info.access_key())),
reply,
})
.ok();
let mut wire = rx
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Interrupted, e))??;
let remote_info: WireInfo = wire.unwire().await?;
let remote_wire_id = remote_info.wire_id();
wire = wire.with_peer(Peer::<C>::new(
remote_info,
Some(local_handle.clone()),
peer.connect_config.clone(),
));
self.wiring(remote_wire_id).await?;
Ok(wire)
} else {
let mut wire = WireStream::new(stream);
0u8.wiring(&mut wire).await?;
let forward_info = Some((peer.wire_info.wire_id(), peer.wire_info.access_key()));
wire.wire(forward_info).await?;
let remote_info: WireInfo = wire.unwire().await?;
let remote_wire_id = remote_info.wire_id();
wire = wire.with_peer(Peer::new(remote_info, None, peer.connect_config.clone()));
self.wiring(remote_wire_id).await?;
Ok(wire)
}
}
}
}
impl<T: AsyncRead, C: ConnectConfig> AsyncRead for WireStream<T, C> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().stream.poll_read(cx, buf)
}
}
impl<T: AsyncWrite, C: ConnectConfig> AsyncWrite for WireStream<T, C> {
fn is_write_vectored(&self) -> bool {
self.stream.is_write_vectored()
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().stream.poll_flush(cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().stream.poll_shutdown(cx)
}
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().stream.poll_write(cx, buf)
}
fn poll_write_vectored(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().stream.poll_write_vectored(cx, bufs)
}
}
pub trait HandleWire<C: ConnectConfig> {
type Error: std::error::Error;
fn handle_wire(&mut self, stream: WireStream<C::Stream, C>) -> Result<(), Self::Error>;
}
impl<C: ConnectConfig> HandleWire<C> for UnboundedSender<WireStream<C::Stream, C>> {
type Error = tokio::sync::mpsc::error::SendError<WireStream<C::Stream, C>>;
fn handle_wire(
&mut self,
stream: WireStream<C::Stream, C>,
) -> Result<(), tokio::sync::mpsc::error::SendError<WireStream<C::Stream, C>>> {
self.send(stream)
}
}
#[pin_project]
#[derive(Debug)]
pub struct WireStream<T, C: ConnectConfig> {
pub(crate) local: Option<Local<C>>,
pub(crate) peer: Option<Peer<C>>,
#[pin]
pub(crate) stream: T,
}
impl<T, C: ConnectConfig> WireStream<T, C> {
pub fn new(stream: T) -> Self {
Self {
local: None,
peer: None,
stream,
}
}
pub(crate) fn with_local(mut self, local: Local<C>) -> Self {
self.local.replace(local);
self
}
pub(crate) fn with_peer(mut self, peer: Peer<C>) -> Self {
self.peer.replace(peer);
self
}
pub async fn into<R: Unwiring>(self) -> Result<R, std::io::Error>
where
C: ConnectConfig<Stream = T>,
Self: SplitStream,
{
let mut consume = ConsumeWire(Some(self));
consume.unwire::<R>().await
}
pub fn wired<LocalEvent, RemoteEvent, H: HandleEvent<RemoteEvent>>(
self,
handle_event: H,
) -> Result<WiredHandle<LocalEvent>, std::io::Error>
where
LocalEvent: Wiring + Debug + 'static,
RemoteEvent: Unwiring + Debug + 'static,
Self: SplitStream,
{
let h = WiredServer::new(self, handle_event)?.run();
Ok(h)
}
}
pub struct WireChannel<Sender, Receiver> {
pub sender: Sender,
pub receiver: Receiver,
}
#[allow(dead_code)]
impl<S, R> WireChannel<S, R> {
fn new(sender: S, receiver: R) -> Self {
Self { sender, receiver }
}
pub fn into_inner(self) -> (S, R) {
(self.sender, self.receiver)
}
}
impl<S: Wiring + 'static, R: Unwiring + 'static> Unwiring
for WireChannel<tokio::sync::mpsc::Sender<S>, tokio::sync::mpsc::Receiver<R>>
{
fn unwiring<W: Unwire>(wire: &mut W) -> impl std::future::Future<Output = Result<Self, std::io::Error>> + Send {
async move {
let buffer = wire.bounded_buffer();
let wire = wire.stream().await?;
let (mut r, mut w) = wire.split()?;
let (sender, mut rx) = tokio::sync::mpsc::channel::<S>(buffer.into());
let sender_task = async move {
while let Some(item) = rx.recv().await {
if let Err(_) = w.wire(item).await {
rx.close();
break;
}
}
w.shutdown().await.ok();
};
let s_j = tokio::spawn(sender_task.boxed());
let (tx, receiver) = tokio::sync::mpsc::channel::<R>(buffer.into());
let recv_task = async move {
loop {
tokio::select! {
_ = tx.closed() => {
break;
},
item = r.unwire::<R>() => {
if let Ok(item) = item {
tx.send(item).await.ok();
} else {
break;
}
},
}
}
s_j.abort();
};
tokio::spawn(recv_task.boxed());
Ok(Self::new(sender, receiver))
}
}
}
impl<S: Wiring + 'static, R: Unwiring + 'static> Unwiring for WireChannel<UnboundedSender<S>, UnboundedReceiver<R>> {
fn unwiring<W: Unwire>(wire: &mut W) -> impl std::future::Future<Output = Result<Self, std::io::Error>> + Send {
async move {
let wire = wire.stream().await?;
let (mut r, mut w) = wire.split()?;
let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<S>();
let sender_task = async move {
while let Some(item) = rx.recv().await {
if let Err(_) = w.wire(item).await {
rx.close();
break;
}
}
w.shutdown().await.ok();
};
let s_j = tokio::spawn(sender_task.boxed());
let (tx, receiver) = tokio::sync::mpsc::unbounded_channel::<R>();
let recv_task = async move {
loop {
tokio::select! {
_ = tx.closed() => {
break;
},
item = r.unwire::<R>() => {
if let Ok(item) = item {
tx.send(item).ok();
} else {
break;
}
},
}
}
s_j.abort();
};
tokio::spawn(recv_task.boxed());
Ok(Self::new(sender, receiver))
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct NoHandle;
#[derive(Debug, Clone, Copy)]
pub struct WireConfig<C: ConnectConfig, H = NoHandle> {
config: C,
handle: H,
}
#[allow(dead_code)]
impl<C: ConnectConfig> WireConfig<C> {
pub fn new(config: C) -> Self {
Self {
config,
handle: NoHandle,
}
}
pub(crate) fn with_handle(
config: C,
handle: tokio::sync::mpsc::UnboundedSender<WireListenerEvent<C>>,
) -> WireConfig<C, tokio::sync::mpsc::UnboundedSender<WireListenerEvent<C>>> {
WireConfig::<C, _> { config, handle }
}
pub async fn connect(&self, connect_info: &ConnectInfo) -> Result<WireStream<C::Stream, C>, std::io::Error> {
let stream = self.config.connect_stream(&connect_info).await?;
let stream = self.config.enhance_stream(stream)?;
let mut wire = WireStream::new(stream);
wire.wire(0u16).await?;
let wire_info = wire.unwire().await?;
let peer = Peer::new(wire_info, None, self.config.clone());
Ok(wire.with_peer(peer))
}
}
#[allow(dead_code)]
impl<C: ConnectConfig> WireConfig<C, tokio::sync::mpsc::UnboundedSender<WireListenerEvent<C>>> {
pub async fn wire<const RETURN: bool>(
&self,
stream: C::RawStream,
) -> Result<Option<WireStream<C::Stream, C>>, std::io::Error> {
let mut stream = self.config.enhance_stream(stream)?;
let remote_info = stream.unwire().await?;
let forward_info = stream.unwire().await?;
if RETURN {
let (reply, rx) = oneshot::channel();
let message = WireListenerEvent::Incomingwire {
stream,
remote_info,
forward_info,
reply: Some(reply),
};
self.handle
.send(message)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::NotConnected, e))?;
let w = rx
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Interrupted, e))?;
w
} else {
let message = WireListenerEvent::Incomingwire {
stream,
remote_info,
forward_info,
reply: None,
};
self.handle
.send(message)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::NotConnected, e))?;
Ok(None)
}
}
pub fn shutdown(&self) {
self.handle.send(WireListenerEvent::<C>::Shutdown).ok();
}
pub async fn connect(&self, connect_info: &ConnectInfo) -> Result<WireStream<C::Stream, C>, std::io::Error> {
let raw_stream = self.config.connect_stream(&connect_info).await?;
let stream = self.config.enhance_stream(raw_stream)?;
let (reply, rx) = oneshot::channel();
let event = WireListenerEvent::OutgoingWire {
stream,
forward_info: None,
reply,
};
self.handle
.send(event)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
let mut w = rx
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))??;
let peer_info = w.unwire().await?;
Ok(w.with_peer(Peer::new(peer_info, Some(self.handle.clone()), self.config.clone())))
}
}
pub trait Wiring: Send + Sync {
const SAFE: bool = true;
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send;
}
impl Wiring for WireInfo {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move {
self.wire_id.wiring(wire).await?;
self.access_key.wiring(wire).await?;
self.connect_info.wiring(wire).await
}
}
}
impl<'a> Wiring for &'a WireInfo {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move {
self.wire_id.wiring(wire).await?;
self.access_key.wiring(wire).await?;
(&self.connect_info).wiring(wire).await
}
}
}
impl Wiring for String {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move { self.as_bytes().wiring(wire).await }
}
}
impl<'a> Wiring for &'a String {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move { self.as_bytes().wiring(wire).await }
}
}
impl<'a> Wiring for &'a str {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move { self.as_bytes().wiring(wire).await }
}
}
impl<'a> Wiring for &'a [u8] {
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move {
let len = self.len() as u64;
len.wiring(wire).await?;
wire.write_all(self).await
}
}
}
impl<'a, const LEN: usize> Wiring for &'a [u8; LEN] {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move { wire.write_all(self).await }
}
}
impl<const LEN: usize> Wiring for [u8; LEN] {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move { wire.write_all(&self).await }
}
}
impl Wiring for Url {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move { self.as_str().wiring(wire).await }
}
}
impl<'a> Wiring for &'a Url {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move { self.as_str().wiring(wire).await }
}
}
impl<T> Wiring for tokio::sync::oneshot::Sender<T>
where
T: Unwiring + 'static,
{
const SAFE: bool = false;
#[inline]
fn wiring<W: Wire>(mut self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let mut new: W::Stream = wire.stream().await?;
let task = async move {
tokio::select! {
_ = self.closed() => {
},
item = new.unwire::<T>() => {
if let Ok(item) = item {
self.send(item).ok();
}
},
}
};
tokio::spawn(task.boxed());
Ok(())
}
}
}
impl<T> Wiring for UnboundedSender<T>
where
T: Unwiring + 'static,
{
const SAFE: bool = false;
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let new: W::Stream = wire.stream().await?;
let closed_handle = self.clone();
let (mut read, mut send) = new.split()?;
let shutdown = async move {
closed_handle.closed().await;
send.shutdown().await.ok();
};
let j = tokio::spawn(shutdown.boxed());
let task = async move {
while let Ok(item) = read.unwire().await {
if let Err(_) = self.send(item) {
break;
};
}
j.abort();
};
tokio::spawn(task.boxed());
Ok(())
}
}
}
impl<T: Wiring + 'static + Clone> Wiring for tokio::sync::broadcast::Receiver<T> {
const SAFE: bool = false;
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move {
let w = wire.stream().await?;
let (mut r, mut w) = w.split()?;
let mut rx = self;
let task = async move {
while let Ok(item) = rx.recv().await {
if let Err(_) = w.wire(item).await {
break;
}
}
};
let j = tokio::spawn(task.boxed());
let detect_shutdown = async move {
r.read_u8().await.ok();
j.abort();
};
tokio::spawn(detect_shutdown.boxed());
Ok(())
}
}
}
impl<T: Wiring + 'static + Clone> Wiring for tokio::sync::watch::Receiver<T> {
const SAFE: bool = false;
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move {
let w = wire.stream().await?;
let (mut r, mut w) = w.split()?;
let mut rx = tokio_stream::wrappers::WatchStream::new(self);
let task = async move {
while let Some(item) = rx.next().await {
if let Err(_) = w.wire(item).await {
break;
}
}
};
let j = tokio::spawn(task.boxed());
let detect_shutdown = async move {
r.read_u8().await.ok();
j.abort();
};
tokio::spawn(detect_shutdown.boxed());
Ok(())
}
}
}
impl<T: Wiring + Unwiring + 'static + Clone> Wiring for tokio::sync::watch::Sender<T> {
const SAFE: bool = false;
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
async move {
let mut w = wire.stream().await?;
let r = self.borrow().clone();
w.wire(r).await?;
let task = async move {
loop {
tokio::select! {
_ = self.closed() => {
w.shutdown().await.ok();
break;
},
item = w.unwire::<T>() => {
if let Ok(item ) = item {
if let Err(_) = self.send(item) {
break;
}
} else {
break
}
},
else => break,
};
}
};
tokio::spawn(task.boxed());
Ok(())
}
}
}
impl<T> Wiring for tokio::sync::broadcast::Sender<T>
where
T: Unwiring + 'static,
{
const SAFE: bool = false;
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let mut new: W::Stream = wire.stream().await?;
let task = async move {
while let Ok(item) = new.unwire().await {
if let Err(_) = self.send(item) {
break;
};
}
};
tokio::spawn(task.boxed());
Ok(())
}
}
}
impl<T> Wiring for tokio::sync::mpsc::Sender<T>
where
T: Unwiring + 'static,
{
const SAFE: bool = false;
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let new: W::Stream = wire.stream().await?;
let closed_handle = self.clone();
let (mut read, mut send) = new.split()?;
let shutdown = async move {
closed_handle.closed().await;
send.shutdown().await.ok();
};
let j = tokio::spawn(shutdown.boxed());
let task = async move {
while let Ok(item) = read.unwire().await {
if let Err(_) = self.send(item).await {
break;
};
}
j.abort();
};
tokio::spawn(task.boxed());
Ok(())
}
}
}
impl<T: Wiring + 'static> Wiring for tokio::sync::oneshot::Receiver<T> {
const SAFE: bool = false;
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let new: W::Stream = wire.stream().await?;
let (mut r, mut w) = new.split()?;
let task = async move {
tokio::select! {
_ = r.read_u8() => {
},
item = self => {
if let Ok(item) = item {
w.wire(item).await.ok();
};
}
else => {
()
},
}
w.shutdown().await.ok();
};
tokio::spawn(task.boxed());
Ok(())
}
}
}
impl<T: Wiring + 'static> Wiring for UnboundedReceiver<T> {
const SAFE: bool = false;
fn wiring<W: Wire>(mut self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let new: W::Stream = wire.stream().await?;
let (mut r, mut w) = new.split()?;
let task = async move {
while let Some(item) = self.recv().await {
if let Err(_) = w.wire(item).await {
break;
}
}
};
let h = tokio::spawn(task.boxed());
let detect_shutdown = async move {
r.read_u8().await.ok();
h.abort();
};
tokio::spawn(detect_shutdown.boxed());
Ok(())
}
}
}
impl<T: Wiring + 'static> Wiring for tokio::sync::mpsc::Receiver<T> {
const SAFE: bool = false;
#[inline]
fn wiring<W: Wire>(mut self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let new: W::Stream = wire.stream().await?;
let (mut r, mut w) = new.split()?;
let task = async move {
while let Some(item) = self.recv().await {
if let Err(_) = w.wire(item).await {
break;
}
}
};
let h = tokio::spawn(task.boxed());
let detect_shutdown = async move {
r.read_u8().await.ok();
h.abort();
};
tokio::spawn(detect_shutdown.boxed());
Ok(())
}
}
}
impl Wiring for () {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send {
wire.wiring(1u8)
}
}
impl Wiring for bool {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u8(self as u8)
}
}
impl<'a> Wiring for &'a bool {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u8(*self as u8)
}
}
impl Wiring for u8 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u8(self)
}
}
impl<'a> Wiring for &'a u8 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u8(*self)
}
}
impl Wiring for i8 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i8(self)
}
}
impl<'a> Wiring for &'a i8 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i8(*self)
}
}
impl Wiring for u16 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u16(self)
}
}
impl<'a> Wiring for &'a u16 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u16(*self)
}
}
impl Wiring for i16 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i16(self)
}
}
impl<'a> Wiring for &'a i16 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i16(*self)
}
}
impl Wiring for u32 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u32(self)
}
}
impl<'a> Wiring for &'a u32 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u32(*self)
}
}
impl Wiring for i32 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i32(self)
}
}
impl<'a> Wiring for &'a i32 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i32(*self)
}
}
impl Wiring for u64 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u64(self)
}
}
impl<'a> Wiring for &'a u64 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u64(*self)
}
}
impl Wiring for i64 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i64(self)
}
}
impl<'a> Wiring for &'a i64 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i64(*self)
}
}
impl Wiring for u128 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u128(self)
}
}
impl<'a> Wiring for &'a u128 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_u128(*self)
}
}
impl Wiring for i128 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i128(self)
}
}
impl<'a> Wiring for &'a i128 {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
wire.write_i128(*self)
}
}
impl<T: Wiring + 'static> Wiring for Vec<T> {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let len = self.len() as u64;
len.wiring(wire).await?;
let t = TypeId::of::<T>();
let is_u8 = TypeId::of::<u8>();
let is_i8 = TypeId::of::<i8>();
if t == is_u8 || t == is_i8 {
let vec = unsafe { std::mem::transmute::<_, Vec<u8>>(self) };
wire.write_all(vec.as_slice()).await?;
} else {
for t in self {
t.wiring(wire).await?;
}
}
Ok(())
}
}
}
impl<'a, T: Wiring> Wiring for &'a Vec<T>
where
&'a T: Wiring + 'static,
{
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let len = self.len() as u64;
len.wiring(wire).await?;
let t = TypeId::of::<T>();
let is_u8 = TypeId::of::<u8>();
let is_i8 = TypeId::of::<i8>();
if t == is_u8 || t == is_i8 {
let vec = unsafe { std::mem::transmute::<_, &'a Vec<u8>>(self) };
wire.write_all(vec.as_slice()).await?;
} else {
let mut i = self.iter();
while let Some(t) = i.next() {
t.wiring(wire).await?;
}
}
Ok(())
}
}
}
impl<T: Wiring> Wiring for std::collections::HashSet<T> {
#[inline]
fn wiring<W: Wire>(mut self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async move {
let len = self.len() as u64;
len.wiring(wire).await?;
let mut s = self.drain();
while let Some(t) = s.next() {
t.wiring(wire).await?;
}
Ok(())
}
}
}
impl<'a, T: Wiring> Wiring for &'a std::collections::HashSet<T>
where
&'a T: Wiring + std::fmt::Debug,
{
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
let mut i = self.iter();
async move {
let len = self.len() as u64;
len.wiring(wire).await?;
while let Some(t) = i.next() {
t.wiring(wire).await?;
}
Ok(())
}
}
}
impl<T: Wiring> Wiring for Option<T> {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async {
if let Some(t) = self {
1u8.wiring(wire).await?;
t.wiring(wire).await
} else {
0u8.wiring(wire).await
}
}
}
}
impl<T: Wiring, TT: Wiring> Wiring for (T, TT) {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async {
self.0.wiring(wire).await?;
self.1.wiring(wire).await
}
}
}
impl<T: Wiring, TT: Wiring, TTT: Wiring> Wiring for (T, TT, TTT) {
#[inline]
fn wiring<W: Wire>(self, wire: &mut W) -> impl std::future::Future<Output = Result<(), std::io::Error>> {
async {
self.0.wiring(wire).await?;
self.1.wiring(wire).await?;
self.2.wiring(wire).await
}
}
}