use crate::{Error, Message};
use std::time::Duration;
use super::{BusType, Watch};
use futures_util::lock::Mutex as FMutex;
use std::sync::Mutex;
use futures_executor::block_on;
use dbus_native_channel::address;
use std::error::Error as stdError;
use futures_util::io as fio;
use std::pin::Pin;
use std::os::unix::net::UnixStream;
use std::collections::VecDeque;
use std::os::unix::io::{RawFd, AsRawFd};
pub struct Channel {
unique_name: Option<crate::strings::BusName<'static>>,
out_queue: Mutex<(u32, VecDeque<Message>)>,
in_queue: Mutex<VecDeque<Message>>,
reader: FMutex<Pin<Box<dyn fio::AsyncBufRead + Send>>>,
writer: FMutex<Pin<Box<dyn fio::AsyncWrite + Send>>>,
raw_fd: RawFd,
unix_fd: bool,
}
async fn do_auth<W: fio::AsyncWrite + std::marker::Unpin, R: fio::AsyncBufRead + std::marker::Unpin>(r: &mut R, w: &mut W) -> Result<bool, Box<dyn stdError>> {
use dbus_native_channel::authentication::Authentication;
use fio::{AsyncWriteExt, AsyncBufReadExt};
let (mut auth, s) = Authentication::new(true);
w.write_all(s.as_bytes()).await?;
loop {
let mut v = vec!();
r.read_until(b'\n', &mut v).await?;
let s = auth.handle(&v)?;
w.write_all(s.as_bytes()).await?;
if let Authentication::Begin(unixfd) = &auth {
return Ok(*unixfd)
}
}
}
impl Channel {
pub fn get_private(bus: BusType) -> Result<Channel, Error> {
block_on(async {
Self::get_private_async(bus, |s| {
let s2 = s.try_clone().unwrap();
let r = fio::AllowStdIo::new(std::io::BufReader::new(s));
let w = fio::AllowStdIo::new(s2);
(r, w)
}).await.map_err(|x| Error::new_failed(&x.to_string()))
})
}
pub async fn get_private_async<R, W, F>(bus: BusType, f: F) -> Result<Channel, Box<dyn stdError>>
where
R: fio::AsyncBufRead + 'static + Send,
W: fio::AsyncWrite + 'static + Send,
F: FnOnce(UnixStream) -> (R, W) {
let addr = match bus {
BusType::Starter => address::read_starter_address(),
BusType::Session => address::read_session_address(),
BusType::System => address::read_system_address(),
}?;
let stream = address::connect_blocking(&addr)?;
let raw_fd = stream.as_raw_fd();
let (r, w) = f(stream);
let (mut r, mut w) = (Box::pin(r), Box::pin(w));
let unix_fd = do_auth(&mut r, &mut w).await?;
let mut c = Channel {
unique_name: None,
raw_fd,
unix_fd,
in_queue: Default::default(),
out_queue: Default::default(),
reader: FMutex::new(r),
writer: FMutex::new(w),
};
let msg = Message::new_method_call("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "Hello")?;
let r = c.send_with_reply_async(msg).await?;
let s: String = r.read1()?;
c.unique_name = Some(crate::strings::BusName::new(s)?);
Ok(c)
}
pub fn send(&self, mut msg: Message) -> Result<u32, ()> {
let mut q = self.out_queue.lock().unwrap();
q.0 += 1;
let serial = q.0;
msg.set_serial(serial);
q.1.push_back(msg);
Ok(serial)
}
async fn write_message(&self, msg: Message) -> Result<(), fio::Error> {
use fio::AsyncWriteExt;
let mut v = vec!();
let _: Result<(), ()> = msg.marshal(|b| {
v.extend_from_slice(b);
Ok(())
});
let mut writer = self.writer.lock().await;
writer.write_all(&v).await
}
async fn read_message(&self) -> Result<Message, Error> {
use fio::AsyncReadExt;
let mut v = vec![0; 16];
let mut reader = self.reader.lock().await;
reader.read_exact(&mut v).await.map_err(|e| Error::new_failed(&e.to_string()))?;
let count = Message::demarshal_bytes_needed(&v).map_err(|_| Error::new_failed("Protocol error"))?;
v.resize(count, 0);
reader.read_exact(&mut v[16..]).await.map_err(|e| Error::new_failed(&e.to_string()))?;
Message::demarshal(&v)
}
pub async fn flush_async(&self) -> Result<(), Error> {
loop {
let msg = {
let mut q = self.out_queue.lock().unwrap();
if let Some(msg) = q.1.pop_front() { msg } else { return Ok(()) }
};
self.write_message(msg).await.map_err(|e| Error::new_failed(&e.to_string()))?;
}
}
pub fn flush(&self) {
block_on(async {
self.flush_async().await.unwrap();
});
}
pub fn blocking_pop_message(&self, _timeout: Duration) -> Result<Option<Message>, Error> {
if let Some(msg) = self.pop_message() { return Ok(Some(msg)) }
block_on(async {
let msg = self.read_message().await?;
Ok(Some(msg))
})
}
pub fn set_watch_enabled(&mut self, _enable: bool) {
}
pub fn watch(&self) -> Watch {
Watch {
fd: self.as_raw_fd(),
read: true,
write: false,
}
}
pub fn is_connected(&self) -> bool {
true
}
pub fn pop_message(&self) -> Option<Message> {
let mut q = self.in_queue.lock().unwrap();
q.pop_front()
}
pub fn read_write(&self, _timeout: Option<Duration>) -> Result<(), ()> {
block_on(async {
self.flush_async().await.map_err(|_| ())?;
let msg = self.read_message().await.map_err(|_| ())?;
let mut q = self.in_queue.lock().unwrap();
q.push_back(msg);
Ok(())
})
}
pub fn send_with_reply_and_block(&self, msg: Message, _timeout: Duration) -> Result<Message, Error> {
block_on(async {
self.send_with_reply_async(msg).await
})
}
pub async fn send_with_reply_async(&self, msg: Message) -> Result<Message, Error> {
let serial = self.send(msg).map_err(|_| Error::new_failed("Failed to send message"))?;
self.flush_async().await?;
loop {
let msg = self.read_message().await?;
if msg.get_reply_serial() == Some(serial) {
return Ok(msg);
}
let mut q = self.in_queue.lock().unwrap();
q.push_back(msg)
}
}
pub fn unique_name(&self) -> Option<&str> {
self.unique_name.as_ref().map(|x| &**x)
}
}
impl AsRawFd for Channel {
fn as_raw_fd(&self) -> RawFd { self.raw_fd }
}