use futures_util::future::FutureExt;
use std::{
convert::TryInto,
future::ready,
os::unix::{
io::{AsRawFd, RawFd},
net::UnixStream,
},
};
use zvariant::ObjectPath;
use async_io::block_on;
use crate::{azync, Guid, Message, Result};
#[derive(Debug, Clone)]
pub struct Connection(azync::Connection);
impl AsRawFd for Connection {
fn as_raw_fd(&self) -> RawFd {
block_on(self.0.as_raw_fd())
}
}
impl Connection {
pub fn new_unix_client(stream: UnixStream, bus_connection: bool) -> Result<Self> {
block_on(azync::Connection::new_unix_client(stream, bus_connection)).map(Self)
}
pub fn new_session() -> Result<Self> {
block_on(azync::Connection::new_session()).map(Self)
}
pub fn new_system() -> Result<Self> {
block_on(azync::Connection::new_system()).map(Self)
}
pub fn new_for_address(address: &str, bus_connection: bool) -> Result<Self> {
block_on(azync::Connection::new_for_address(address, bus_connection)).map(Self)
}
pub fn new_unix_server(stream: UnixStream, guid: &Guid) -> Result<Self> {
block_on(azync::Connection::new_unix_server(stream, guid)).map(Self)
}
pub fn max_queued(&self) -> usize {
block_on(self.0.max_queued())
}
pub fn set_max_queued(self, max: usize) -> Self {
Self(block_on(self.0.set_max_queued(max)))
}
pub fn server_guid(&self) -> &str {
self.0.server_guid()
}
pub fn unique_name(&self) -> Option<&str> {
self.0.unique_name()
}
pub fn receive_message(&self) -> Result<Message> {
block_on(self.0.receive_specific(|_| ready(Ok(true)).boxed()))
}
pub fn receive_specific<P>(&self, predicate: P) -> Result<Message>
where
P: Fn(&Message) -> Result<bool>,
{
block_on(self.0.receive_specific(|msg| ready(predicate(msg)).boxed()))
}
pub fn send_message(&self, msg: Message) -> Result<u32> {
block_on(self.0.send_message(msg))
}
pub fn call_method<'p, B>(
&self,
destination: Option<&str>,
path: impl TryInto<ObjectPath<'p>, Error = zvariant::Error>,
iface: Option<&str>,
method_name: &str,
body: &B,
) -> Result<Message>
where
B: serde::ser::Serialize + zvariant::Type,
{
block_on(
self.0
.call_method(destination, path, iface, method_name, body),
)
}
pub fn emit_signal<'p, B>(
&self,
destination: Option<&str>,
path: impl TryInto<ObjectPath<'p>, Error = zvariant::Error>,
iface: &str,
signal_name: &str,
body: &B,
) -> Result<()>
where
B: serde::ser::Serialize + zvariant::Type,
{
block_on(
self.0
.emit_signal(destination, path, iface, signal_name, body),
)
}
pub fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>
where
B: serde::ser::Serialize + zvariant::Type,
{
block_on(self.0.reply(call, body))
}
pub fn reply_error<B>(&self, call: &Message, error_name: &str, body: &B) -> Result<u32>
where
B: serde::ser::Serialize + zvariant::Type,
{
block_on(self.0.reply_error(call, error_name, body))
}
pub fn is_bus(&self) -> bool {
self.0.is_bus()
}
pub fn inner(&self) -> &azync::Connection {
&self.0
}
pub fn into_inner(self) -> azync::Connection {
self.0
}
}
impl From<azync::Connection> for Connection {
fn from(conn: azync::Connection) -> Self {
Self(conn)
}
}
#[cfg(test)]
mod tests {
use std::{os::unix::net::UnixStream, thread};
use crate::{Connection, Error, Guid};
#[test]
fn unix_p2p() {
let guid = Guid::generate();
let (p0, p1) = UnixStream::pair().unwrap();
let server_thread = thread::spawn(move || {
let c = Connection::new_unix_server(p0, &guid).unwrap();
let reply = c
.call_method(None, "/", Some("org.zbus.p2p"), "Test", &())
.unwrap();
assert_eq!(reply.to_string(), "Method return");
let val: String = reply.body().unwrap();
val
});
let c = Connection::new_unix_client(p1, false).unwrap();
let m = c.receive_message().unwrap();
assert_eq!(m.to_string(), "Method call Test");
c.reply(&m, &("yay")).unwrap();
assert!(matches!(c.receive_message().unwrap_err(), Error::Io(_)));
let val = server_thread.join().expect("failed to join server thread");
assert_eq!(val, "yay");
}
}