use async_broadcast::{broadcast, InactiveReceiver, Sender as Broadcaster};
use event_listener::EventListener;
use once_cell::sync::OnceCell;
use ordered_stream::{OrderedFuture, OrderedStream, PollResult};
use static_assertions::assert_impl_all;
use std::{
collections::{HashMap, HashSet},
convert::TryInto,
future::ready,
io::{self, ErrorKind},
ops::Deref,
pin::Pin,
sync::{
self,
atomic::{AtomicU32, Ordering::SeqCst},
Arc, Weak,
},
task::{Context, Poll},
};
use tracing::{debug, instrument, trace, trace_span, Instrument};
use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
use zvariant::ObjectPath;
use futures_core::{ready, Future};
use futures_sink::Sink;
use futures_util::{sink::SinkExt, StreamExt, TryFutureExt};
use crate::{
async_channel::{channel, Receiver, Sender},
async_lock::Mutex,
blocking, fdo,
raw::{Connection as RawConnection, Socket},
Authenticated, CacheProperties, ConnectionBuilder, DBusError, Error, Executor, Guid, Message,
MessageStream, MessageType, ObjectServer, Result, Task,
};
const DEFAULT_MAX_QUEUED: usize = 64;
#[derive(Debug)]
pub(crate) struct ConnectionInner {
server_guid: Guid,
#[cfg(unix)]
cap_unix_fd: bool,
bus_conn: bool,
unique_name: OnceCell<OwnedUniqueName>,
registered_names: Mutex<HashSet<WellKnownName<'static>>>,
raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
serial: AtomicU32,
executor: Executor<'static>,
#[allow(unused)]
msg_receiver_task: Task<()>,
signal_matches: Mutex<HashMap<String, u64>>,
object_server: OnceCell<blocking::ObjectServer>,
object_server_dispatch_task: OnceCell<Task<()>>,
}
#[derive(Debug)]
struct MessageReceiverTask {
raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
msg_sender: Broadcaster<Arc<Message>>,
error_sender: Sender<Error>,
}
impl MessageReceiverTask {
fn new(
raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
msg_sender: Broadcaster<Arc<Message>>,
error_sender: Sender<Error>,
) -> Arc<Self> {
Arc::new(Self {
raw_conn,
msg_sender,
error_sender,
})
}
fn spawn(self: Arc<Self>, executor: &Executor<'_>) -> Task<()> {
executor.spawn(async move {
self.receive_msg().await;
})
}
#[instrument(skip(self))]
async fn receive_msg(self: Arc<Self>) {
loop {
trace!("Waiting for message on the socket..");
let receive_msg = ReceiveMessage {
raw_conn: &self.raw_conn,
};
let msg = match receive_msg.await {
Ok(msg) => msg,
Err(e) => {
trace!("Error reading from the socket: {:?}", e);
self.error_sender.send(e).await;
self.msg_sender.close();
self.error_sender.close().await;
trace!("Socket reading task stopped");
return;
}
};
trace!("Message received on the socket: {:?}", msg);
let msg = Arc::new(msg);
if let Err(e) = self.msg_sender.broadcast(msg.clone()).await {
debug!("Error broadcasting message to streams: {:?}", e);
return;
}
trace!("Message broadcasted to all streams: {:?}", msg);
}
}
}
#[derive(Clone, Debug)]
pub struct Connection {
pub(crate) inner: Arc<ConnectionInner>,
pub(crate) msg_receiver: InactiveReceiver<Arc<Message>>,
pub(crate) error_receiver: Receiver<Error>,
}
assert_impl_all!(Connection: Send, Sync, Unpin);
#[derive(Debug)]
pub(crate) struct PendingMethodCall {
stream: Option<MessageStream>,
serial: u32,
}
impl Future for PendingMethodCall {
type Output = Result<Arc<Message>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_before(cx, None).map(|ret| {
ret.map(|(_, r)| r).unwrap_or_else(|| {
Err(crate::Error::Io(io::Error::new(
ErrorKind::BrokenPipe,
"socket closed",
)))
})
})
}
}
impl OrderedFuture for PendingMethodCall {
type Output = Result<Arc<Message>>;
type Ordering = zbus::MessageSequence;
fn poll_before(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
before: Option<&Self::Ordering>,
) -> Poll<Option<(Self::Ordering, Self::Output)>> {
let this = self.get_mut();
if let Some(stream) = &mut this.stream {
loop {
match Pin::new(&mut *stream).poll_next_before(cx, before) {
Poll::Ready(PollResult::Item {
data: Ok(msg),
ordering,
}) => {
if msg.reply_serial() != Some(this.serial) {
continue;
}
let res = match msg.message_type() {
MessageType::Error => Err(msg.into()),
MessageType::MethodReturn => Ok(msg),
_ => continue,
};
this.stream = None;
return Poll::Ready(Some((ordering, res)));
}
Poll::Ready(PollResult::Item {
data: Err(e),
ordering,
}) => {
return Poll::Ready(Some((ordering, Err(e))));
}
Poll::Ready(PollResult::NoneBefore) => {
return Poll::Ready(None);
}
Poll::Ready(PollResult::Terminated) => {
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
}
Poll::Ready(None)
}
}
impl Connection {
pub async fn send_message(&self, mut msg: Message) -> Result<u32> {
let serial = self.assign_serial_num(&mut msg)?;
trace!("Sending message: {:?}", msg);
(&mut &*self).send(msg).await?;
trace!("Sent message with serial: {}", serial);
Ok(serial)
}
pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
&self,
destination: Option<D>,
path: P,
interface: Option<I>,
method_name: M,
body: &B,
) -> Result<Arc<Message>>
where
D: TryInto<BusName<'d>>,
P: TryInto<ObjectPath<'p>>,
I: TryInto<InterfaceName<'i>>,
M: TryInto<MemberName<'m>>,
D::Error: Into<Error>,
P::Error: Into<Error>,
I::Error: Into<Error>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
{
let m = Message::method(
self.unique_name(),
destination,
path,
interface,
method_name,
body,
)?;
self.call_method_raw(m).await?.await
}
pub(crate) async fn call_method_raw(&self, msg: Message) -> Result<PendingMethodCall> {
debug_assert_eq!(msg.message_type(), MessageType::MethodCall);
let stream = Some(MessageStream::from(self.clone()));
let serial = self.send_message(msg).await?;
Ok(PendingMethodCall { stream, serial })
}
pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
&self,
destination: Option<D>,
path: P,
interface: I,
signal_name: M,
body: &B,
) -> Result<()>
where
D: TryInto<BusName<'d>>,
P: TryInto<ObjectPath<'p>>,
I: TryInto<InterfaceName<'i>>,
M: TryInto<MemberName<'m>>,
D::Error: Into<Error>,
P::Error: Into<Error>,
I::Error: Into<Error>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
{
let m = Message::signal(
self.unique_name(),
destination,
path,
interface,
signal_name,
body,
)?;
self.send_message(m).await.map(|_| ())
}
pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>
where
B: serde::ser::Serialize + zvariant::DynamicType,
{
let m = Message::method_reply(self.unique_name(), call, body)?;
self.send_message(m).await
}
pub async fn reply_error<'e, E, B>(
&self,
call: &Message,
error_name: E,
body: &B,
) -> Result<u32>
where
B: serde::ser::Serialize + zvariant::DynamicType,
E: TryInto<ErrorName<'e>>,
E::Error: Into<Error>,
{
let m = Message::method_error(self.unique_name(), call, error_name, body)?;
self.send_message(m).await
}
pub async fn reply_dbus_error(
&self,
call: &zbus::MessageHeader<'_>,
err: impl DBusError,
) -> Result<u32> {
let m = err.create_reply(call);
self.send_message(m?).await
}
pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
where
W: TryInto<WellKnownName<'w>>,
W::Error: Into<Error>,
{
let well_known_name = well_known_name.try_into().map_err(Into::into)?;
let mut names = self.inner.registered_names.lock().await;
if names.contains(&well_known_name) {
return Ok(());
}
let reply = fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?
.request_name(
well_known_name.clone(),
fdo::RequestNameFlags::ReplaceExisting | fdo::RequestNameFlags::DoNotQueue,
)
.await?;
if let fdo::RequestNameReply::Exists = reply {
Err(Error::NameTaken)
} else {
names.insert(well_known_name.to_owned());
Ok(())
}
}
pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
where
W: TryInto<WellKnownName<'w>>,
W::Error: Into<Error>,
{
let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
let mut names = self.inner.registered_names.lock().await;
if !names.remove(&well_known_name.to_owned()) {
return Ok(false);
}
fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?
.release_name(well_known_name)
.await
.map(|_| true)
.map_err(Into::into)
}
pub fn is_bus(&self) -> bool {
self.inner.bus_conn
}
pub fn assign_serial_num(&self, msg: &mut Message) -> Result<u32> {
let mut serial = 0;
msg.modify_primary_header(|primary| {
serial = *primary.serial_num_or_init(|| self.next_serial());
Ok(())
})?;
Ok(serial)
}
pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
self.inner.unique_name.get()
}
pub fn max_queued(&self) -> usize {
self.msg_receiver.capacity()
}
pub fn set_max_queued(&mut self, max: usize) {
self.msg_receiver.set_capacity(max);
}
pub fn server_guid(&self) -> &str {
self.inner.server_guid.as_str()
}
pub fn executor(&self) -> &Executor<'static> {
&self.inner.executor
}
pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
struct Wrapper<'a>(&'a blocking::ObjectServer);
impl<'a> Deref for Wrapper<'a> {
type Target = ObjectServer;
fn deref(&self) -> &Self::Target {
self.0.inner()
}
}
Wrapper(self.sync_object_server(true))
}
pub(crate) fn sync_object_server(&self, start: bool) -> &blocking::ObjectServer {
self.inner
.object_server
.get_or_init(|| self.setup_object_server(start))
}
fn setup_object_server(&self, start: bool) -> blocking::ObjectServer {
if start {
self.start_object_server();
}
blocking::ObjectServer::new(self)
}
#[instrument(skip(self))]
pub(crate) fn start_object_server(&self) {
self.inner.object_server_dispatch_task.get_or_init(|| {
trace!("starting ObjectServer task");
let weak_conn = WeakConnection::from(self);
let mut stream = MessageStream::from(self.clone()).filter(|msg| {
ready(msg.as_ref().map(|m| m.message_type() == MessageType::MethodCall).unwrap_or_default())
});
self.inner.executor.spawn(
async move {
trace!("waiting for incoming method call messages..");
while let Some(msg) = stream.next().await.and_then(|m| {
if let Err(e) = &m {
debug!("Error while reading from object server stream: {:?}", e);
}
m.ok()
}) {
trace!("Got `{}`. Will spawn a task for dispatch..", msg);
if let Some(conn) = weak_conn.upgrade() {
let executor = conn.inner.executor.clone();
executor
.spawn(
async move {
trace!("spawned a task to dispatch `{}`.", msg);
let server = conn.object_server();
if let Err(e) = server.dispatch_message(&msg).await {
debug!(
"Error dispatching message. Message: {:?}, error: {:?}",
msg, e
);
}
}
.instrument(trace_span!("ObjectServer method task"))
)
.detach();
} else {
trace!("Connection is gone, stopping associated object server task");
break;
}
}
}
.instrument(trace_span!("ObjectServer task")),
)
});
}
pub(crate) async fn add_match(&self, expr: String) -> Result<()> {
use std::collections::hash_map::Entry;
if !self.is_bus() {
return Ok(());
}
let mut subscriptions = self.inner.signal_matches.lock().await;
match subscriptions.entry(expr) {
Entry::Vacant(e) => {
fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?
.add_match(e.key())
.await?;
e.insert(1);
}
Entry::Occupied(mut e) => {
*e.get_mut() += 1;
}
}
Ok(())
}
pub(crate) async fn remove_match(&self, expr: String) -> Result<bool> {
use std::collections::hash_map::Entry;
if !self.is_bus() {
return Ok(true);
}
let mut subscriptions = self.inner.signal_matches.lock().await;
match subscriptions.entry(expr) {
Entry::Vacant(_) => Ok(false),
Entry::Occupied(mut e) => {
*e.get_mut() -= 1;
if *e.get() == 0 {
fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?
.remove_match(e.key())
.await?;
e.remove();
}
Ok(true)
}
}
}
pub(crate) fn queue_remove_match(&self, expr: String) {
let conn = self.clone();
self.inner
.executor
.spawn(async move { conn.remove_match(expr).await })
.detach()
}
async fn hello_bus(&self) -> Result<()> {
let dbus_proxy = fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?;
let future = dbus_proxy.hello().map_err(Into::into);
let name = self.run_future_at_init(future).await?;
self.inner
.unique_name
.set(name)
.expect("Attempted to set unique_name twice");
Ok(())
}
pub(crate) async fn run_future_at_init<F, O>(&self, future: F) -> Result<O>
where
F: Future<Output = Result<O>>,
{
self.inner.executor.run(future).await
}
pub(crate) async fn new(
auth: Authenticated<Box<dyn Socket>>,
bus_connection: bool,
#[allow(unused)] internal_executor: bool,
) -> Result<Self> {
let auth = auth.into_inner();
#[cfg(unix)]
let cap_unix_fd = auth.cap_unix_fd;
let (msg_sender, msg_receiver) = broadcast(DEFAULT_MAX_QUEUED);
let msg_receiver = msg_receiver.deactivate();
let (error_sender, error_receiver) = channel(1);
let executor = Executor::new();
let raw_conn = Arc::new(sync::Mutex::new(auth.conn));
let msg_receiver_task =
MessageReceiverTask::new(raw_conn.clone(), msg_sender, error_sender).spawn(&executor);
let connection = Self {
error_receiver,
msg_receiver,
inner: Arc::new(ConnectionInner {
raw_conn,
server_guid: auth.server_guid,
#[cfg(unix)]
cap_unix_fd,
bus_conn: bus_connection,
serial: AtomicU32::new(1),
unique_name: OnceCell::new(),
signal_matches: Mutex::new(HashMap::new()),
object_server: OnceCell::new(),
object_server_dispatch_task: OnceCell::new(),
executor: executor.clone(),
msg_receiver_task,
registered_names: Mutex::new(HashSet::new()),
}),
};
#[cfg(not(feature = "tokio"))]
if internal_executor {
std::thread::Builder::new()
.name("zbus::Connection executor".into())
.spawn(move || {
crate::utils::block_on(async move {
while !executor.is_empty() {
executor.tick().await;
}
})
})?;
}
if !bus_connection {
return Ok(connection);
}
connection.hello_bus().await?;
Ok(connection)
}
fn next_serial(&self) -> u32 {
self.inner.serial.fetch_add(1, SeqCst)
}
pub async fn session() -> Result<Self> {
ConnectionBuilder::session()?.build().await
}
pub async fn system() -> Result<Self> {
ConnectionBuilder::system()?.build().await
}
pub fn monitor_activity(&self) -> EventListener {
self.inner
.raw_conn
.lock()
.expect("poisoned lock")
.monitor_activity()
}
}
impl<T> Sink<T> for Connection
where
T: Into<Arc<Message>>,
{
type Error = Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
<&Connection as Sink<Arc<Message>>>::poll_ready(Pin::new(&mut &*self), cx)
}
fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()> {
Pin::new(&mut &*self).start_send(msg)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
<&Connection as Sink<Arc<Message>>>::poll_flush(Pin::new(&mut &*self), cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
<&Connection as Sink<Arc<Message>>>::poll_close(Pin::new(&mut &*self), cx)
}
}
impl<'a, T> Sink<T> for &'a Connection
where
T: Into<Arc<Message>>,
{
type Error = Error;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()> {
let msg = msg.into();
#[cfg(unix)]
if !msg.fds().is_empty() && !self.inner.cap_unix_fd {
return Err(Error::Unsupported);
}
self.inner
.raw_conn
.lock()
.expect("poisoned lock")
.enqueue_message(msg);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.raw_conn.lock().expect("poisoned lock").flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut raw_conn = self.inner.raw_conn.lock().expect("poisoned lock");
let res = raw_conn.flush(cx);
match ready!(res) {
Ok(_) => (),
Err(e) => return Poll::Ready(Err(e)),
}
Poll::Ready(raw_conn.close())
}
}
struct ReceiveMessage<'r> {
raw_conn: &'r sync::Mutex<RawConnection<Box<dyn Socket>>>,
}
impl<'r> Future for ReceiveMessage<'r> {
type Output = Result<Message>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut raw_conn = self.raw_conn.lock().expect("poisoned lock");
raw_conn.try_receive_message(cx)
}
}
impl From<crate::blocking::Connection> for Connection {
fn from(conn: crate::blocking::Connection) -> Self {
conn.into_inner()
}
}
#[derive(Debug)]
pub(crate) struct WeakConnection {
inner: Weak<ConnectionInner>,
msg_receiver: InactiveReceiver<Arc<Message>>,
error_receiver: Receiver<Error>,
}
impl WeakConnection {
pub fn upgrade(&self) -> Option<Connection> {
self.inner.upgrade().map(|inner| Connection {
inner,
msg_receiver: self.msg_receiver.clone(),
error_receiver: self.error_receiver.clone(),
})
}
}
impl From<&Connection> for WeakConnection {
fn from(conn: &Connection) -> Self {
Self {
inner: Arc::downgrade(&conn.inner),
msg_receiver: conn.msg_receiver.clone(),
error_receiver: conn.error_receiver.clone(),
}
}
}
#[cfg(test)]
mod tests {
use futures_util::stream::TryStreamExt;
use ntest::timeout;
use test_log::test;
use crate::AuthMechanism;
use super::*;
async fn test_p2p(
server1: Connection,
client1: Connection,
server2: Connection,
client2: Connection,
) -> Result<()> {
let _forward_task = client1.executor().spawn(async move {
futures_util::try_join!(
MessageStream::from(&server1).forward(&client2),
MessageStream::from(&client2).forward(&server1),
)
});
let server_future = async {
let mut stream = MessageStream::from(&server2);
let method = loop {
let m = stream.try_next().await?.unwrap();
if m.to_string() == "Method call Test" {
break m;
}
};
server2
.emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &())
.await?;
server2.reply(&method, &("yay")).await
};
let client_future = async {
let mut stream = MessageStream::from(&client1);
let reply = client1
.call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
.await?;
assert_eq!(reply.to_string(), "Method return");
let m = stream.try_next().await?.unwrap();
assert_eq!(m.to_string(), "Signal ASignalForYou");
reply.body::<String>()
};
let (val, _) = futures_util::try_join!(client_future, server_future,)?;
assert_eq!(val, "yay");
Ok(())
}
#[test]
#[timeout(15000)]
fn tcp_p2p() {
crate::utils::block_on(test_tcp_p2p()).unwrap();
}
async fn test_tcp_p2p() -> Result<()> {
let (server1, client1) = tcp_p2p_pipe().await?;
let (server2, client2) = tcp_p2p_pipe().await?;
test_p2p(server1, client1, server2, client2).await
}
async fn tcp_p2p_pipe() -> Result<(Connection, Connection)> {
let guid = Guid::generate();
#[cfg(not(feature = "tokio"))]
let (server_conn_builder, client_conn_builder) = {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let p1 = std::net::TcpStream::connect(addr).unwrap();
let p0 = listener.incoming().next().unwrap().unwrap();
(
ConnectionBuilder::tcp_stream(p0)
.server(&guid)
.p2p()
.auth_mechanisms(&[AuthMechanism::Anonymous]),
ConnectionBuilder::tcp_stream(p1).p2p(),
)
};
#[cfg(feature = "tokio")]
let (server_conn_builder, client_conn_builder) = {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let p1 = tokio::net::TcpStream::connect(addr).await.unwrap();
let p0 = listener.accept().await.unwrap().0;
(
ConnectionBuilder::tcp_stream(p0)
.server(&guid)
.p2p()
.auth_mechanisms(&[AuthMechanism::Anonymous]),
ConnectionBuilder::tcp_stream(p1).p2p(),
)
};
futures_util::try_join!(server_conn_builder.build(), client_conn_builder.build())
}
#[cfg(unix)]
#[test]
#[timeout(15000)]
fn unix_p2p() {
crate::utils::block_on(test_unix_p2p()).unwrap();
}
#[cfg(unix)]
async fn test_unix_p2p() -> Result<()> {
let (server1, client1) = unix_p2p_pipe().await?;
let (server2, client2) = unix_p2p_pipe().await?;
test_p2p(server1, client1, server2, client2).await
}
#[cfg(unix)]
async fn unix_p2p_pipe() -> Result<(Connection, Connection)> {
#[cfg(not(feature = "tokio"))]
use std::os::unix::net::UnixStream;
#[cfg(feature = "tokio")]
use tokio::net::UnixStream;
#[cfg(all(windows, not(feature = "tokio")))]
use uds_windows::UnixStream;
let guid = Guid::generate();
let (p0, p1) = UnixStream::pair().unwrap();
let server = ConnectionBuilder::unix_stream(p0)
.server(&guid)
.p2p()
.build();
let client = ConnectionBuilder::unix_stream(p1).p2p().build();
futures_util::try_join!(client, server)
}
#[cfg(any(
all(feature = "vsock", not(feature = "tokio")),
feature = "tokio-vsock"
))]
#[test]
#[timeout(15000)]
#[ignore]
fn vsock_p2p() {
crate::utils::block_on(test_vsock_p2p()).unwrap();
}
#[cfg(any(
all(feature = "vsock", not(feature = "tokio")),
feature = "tokio-vsock"
))]
async fn test_vsock_p2p() -> Result<()> {
let (server1, client1) = vsock_p2p_pipe().await?;
let (server2, client2) = vsock_p2p_pipe().await?;
test_p2p(server1, client1, server2, client2).await
}
#[cfg(all(feature = "vsock", not(feature = "tokio")))]
async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
let guid = Guid::generate();
let listener = vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, 42).unwrap();
let addr = listener.local_addr().unwrap();
let client = vsock::VsockStream::connect(&addr).unwrap();
let server = listener.incoming().next().unwrap().unwrap();
let server = ConnectionBuilder::vsock_stream(server)
.server(&guid)
.p2p()
.auth_mechanisms(&[AuthMechanism::Anonymous])
.build();
let client = ConnectionBuilder::vsock_stream(client).p2p().build();
futures_util::try_join!(server, client)
}
#[cfg(feature = "tokio-vsock")]
async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
let guid = Guid::generate();
let listener = tokio_vsock::VsockListener::bind(2, 42).unwrap();
let client = tokio_vsock::VsockStream::connect(3, 42).await.unwrap();
let server = listener.incoming().next().await.unwrap().unwrap();
let server = ConnectionBuilder::vsock_stream(server)
.server(&guid)
.p2p()
.auth_mechanisms(&[AuthMechanism::Anonymous])
.build();
let client = ConnectionBuilder::vsock_stream(client).p2p().build();
futures_util::try_join!(server, client)
}
#[test]
#[timeout(15000)]
fn serial_monotonically_increases() {
crate::utils::block_on(test_serial_monotonically_increases());
}
async fn test_serial_monotonically_increases() {
let c = Connection::session().await.unwrap();
let serial = c.next_serial() + 1;
for next in serial..serial + 10 {
assert_eq!(next, c.next_serial());
}
}
#[cfg(all(windows, feature = "windows-gdbus"))]
#[test]
fn connect_gdbus_session_bus() {
let addr = crate::win32::windows_autolaunch_bus_address()
.expect("Unable to get GDBus session bus address");
crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus");
}
}