#![cfg(all(unix, not(target_os = "emscripten")))]
#![cfg_attr(docsrs, doc(cfg(all(unix, not(target_os = "emscripten")))))]
use futures::{prelude::*, future::{BoxFuture, Ready}};
use futures::stream::BoxStream;
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{ListenerEvent, TransportError}
};
use log::debug;
use std::{io, path::PathBuf};
macro_rules! codegen {
($feature_name:expr, $uds_config:ident, $build_listener:expr, $unix_stream:ty, $($mut_or_not:tt)*) => {
#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))]
#[derive(Debug, Clone)]
pub struct $uds_config {
}
impl $uds_config {
pub fn new() -> $uds_config {
$uds_config {}
}
}
impl Transport for $uds_config {
type Output = $unix_stream;
type Error = io::Error;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
if let Ok(path) = multiaddr_to_path(&addr) {
Ok(async move { $build_listener(&path).await }
.map_ok(move |listener| {
stream::once({
let addr = addr.clone();
async move {
debug!("Now listening on {}", addr);
Ok(ListenerEvent::NewAddress(addr))
}
}).chain(stream::unfold(listener, move |$($mut_or_not)* listener| {
let addr = addr.clone();
async move {
let (stream, _) = match listener.accept().await {
Ok(v) => v,
Err(err) => return Some((Err(err), listener))
};
debug!("incoming connection on {}", addr);
let event = ListenerEvent::Upgrade {
upgrade: future::ok(stream),
local_addr: addr.clone(),
remote_addr: addr.clone()
};
Some((Ok(event), listener))
}
}))
})
.try_flatten_stream()
.boxed())
} else {
Err(TransportError::MultiaddrNotSupported(addr))
}
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
if let Ok(path) = multiaddr_to_path(&addr) {
debug!("Dialing {}", addr);
Ok(async move { <$unix_stream>::connect(&path).await }.boxed())
} else {
Err(TransportError::MultiaddrNotSupported(addr))
}
}
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
}
};
}
#[cfg(feature = "async-std")]
codegen!(
"async-std",
UdsConfig,
|addr| async move { async_std::os::unix::net::UnixListener::bind(addr).await },
async_std::os::unix::net::UnixStream,
);
#[cfg(feature = "tokio")]
codegen!(
"tokio",
TokioUdsConfig,
|addr| async move { tokio::net::UnixListener::bind(addr) },
tokio::net::UnixStream,
mut
);
fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> {
let mut protocols = addr.iter();
match protocols.next() {
Some(Protocol::Unix(ref path)) => {
let path = PathBuf::from(path.as_ref());
if !path.is_absolute() {
return Err(())
}
match protocols.next() {
None | Some(Protocol::P2p(_)) => Ok(path),
Some(_) => Err(())
}
}
_ => Err(())
}
}
#[cfg(all(test, feature = "async-std"))]
mod tests {
use super::{multiaddr_to_path, UdsConfig};
use futures::{channel::oneshot, prelude::*};
use std::{self, borrow::Cow, path::Path};
use libp2p_core::{Transport, multiaddr::{Protocol, Multiaddr}};
use tempfile;
#[test]
fn multiaddr_to_path_conversion() {
assert!(
multiaddr_to_path(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
.is_err()
);
assert_eq!(
multiaddr_to_path(&Multiaddr::from(Protocol::Unix("/tmp/foo".into()))),
Ok(Path::new("/tmp/foo").to_owned())
);
assert_eq!(
multiaddr_to_path(&Multiaddr::from(Protocol::Unix("/home/bar/baz".into()))),
Ok(Path::new("/home/bar/baz").to_owned())
);
}
#[test]
fn communicating_between_dialer_and_listener() {
let temp_dir = tempfile::tempdir().unwrap();
let socket = temp_dir.path().join("socket");
let addr = Multiaddr::from(Protocol::Unix(Cow::Owned(socket.to_string_lossy().into_owned())));
let (tx, rx) = oneshot::channel();
async_std::task::spawn(async move {
let mut listener = UdsConfig::new().listen_on(addr).unwrap();
let listen_addr = listener.try_next().await.unwrap()
.expect("some event")
.into_new_address()
.expect("listen address");
tx.send(listen_addr).unwrap();
let (sock, _addr) = listener.try_filter_map(|e| future::ok(e.into_upgrade()))
.try_next()
.await
.unwrap()
.expect("some event");
let mut sock = sock.await.unwrap();
let mut buf = [0u8; 3];
sock.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
});
async_std::task::block_on(async move {
let uds = UdsConfig::new();
let addr = rx.await.unwrap();
let mut socket = uds.dial(addr).unwrap().await.unwrap();
socket.write(&[1, 2, 3]).await.unwrap();
});
}
#[test]
#[ignore] fn larger_addr_denied() {
let uds = UdsConfig::new();
let addr = "/unix//foo/bar"
.parse::<Multiaddr>()
.unwrap();
assert!(uds.listen_on(addr).is_err());
}
#[test]
#[ignore] fn relative_addr_denied() {
assert!("/unix/./foo/bar".parse::<Multiaddr>().is_err());
}
}