use std::future::Future;
use std::pin::Pin;
use futures::StreamExt;
use futures::{pin_mut, select, stream::FuturesUnordered, FutureExt};
#[cfg(not(target_arch = "wasm32"))]
use super::Tcp;
#[cfg(unix)]
use super::Unix;
use crate::channel::handshake::Handshake;
use crate::Channel;
use crate::Result;
use super::WebSocket;
pub enum AnyProvider {
#[cfg(not(target_arch = "wasm32"))]
Tcp(Tcp),
#[cfg(not(target_arch = "wasm32"))]
InsecureTcp(Tcp),
#[cfg(unix)]
Unix(Unix),
#[cfg(unix)]
InsecureUnix(Unix),
Wss(WebSocket),
InsecureWss(WebSocket),
}
impl AnyProvider {
#[inline]
#[cfg(not(target_arch = "wasm32"))]
pub async fn next_handshake(&self) -> Result<Handshake> {
match self {
AnyProvider::Tcp(provider) => provider.next().await,
AnyProvider::InsecureTcp(provider) => provider.next().await,
#[cfg(unix)]
AnyProvider::Unix(provider) => provider.next().await,
#[cfg(unix)]
AnyProvider::InsecureUnix(provider) => provider.next().await,
AnyProvider::Wss(provider) => provider.next().await,
AnyProvider::InsecureWss(provider) => provider.next().await,
}
}
#[inline]
#[cfg(not(target_arch = "wasm32"))]
pub fn encrypted(&self) -> bool {
match self {
AnyProvider::Tcp(_) => true,
AnyProvider::InsecureTcp(_) => false,
#[cfg(unix)]
AnyProvider::Unix(_) => true,
#[cfg(unix)]
AnyProvider::InsecureUnix(_) => false,
AnyProvider::Wss(_) => true,
AnyProvider::InsecureWss(_) => false,
}
}
#[inline]
#[cfg(not(target_arch = "wasm32"))]
pub fn channels(self) -> ChannelIter {
ChannelIter {
listener: self,
futures: FuturesUnordered::new(),
}
}
}
pub struct ChannelIter {
listener: AnyProvider,
futures: FuturesUnordered<Pin<Box<dyn Future<Output = Result<Channel>> + Send + 'static>>>, }
impl ChannelIter {
pub async fn next(&mut self) -> Result<Channel> {
let hs = self.listener.next_handshake().fuse();
pin_mut!(hs);
loop {
let chan = select! {
chan = self.futures.next().fuse() => {
match chan {
Some(chan) => chan,
None => continue,
}
},
res = hs => {
let hs: Handshake = res?;
if self.listener.encrypted() {
let fut = hs.encrypted();
self.futures.push(Box::pin(fut));
continue;
} else {
Ok(hs.raw())
}
},
};
break chan;
}
}
}