use std::collections::VecDeque;
use std::io;
use crate::message::Message;
use crate::message_header::MIN_MESSAGE_SIZE;
use crate::raw::Socket;
use crate::OwnedFd;
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct Connection<S> {
#[derivative(Debug = "ignore")]
socket: S,
raw_in_buffer: Vec<u8>,
raw_in_fds: Vec<OwnedFd>,
msg_in_buffer: Option<Message>,
raw_out_buffer: VecDeque<u8>,
msg_out_buffer: VecDeque<Message>,
}
impl<S: Socket> Connection<S> {
pub(crate) fn wrap(socket: S) -> Connection<S> {
Connection {
socket,
raw_in_buffer: vec![],
raw_in_fds: vec![],
msg_in_buffer: None,
raw_out_buffer: VecDeque::new(),
msg_out_buffer: VecDeque::new(),
}
}
pub fn try_flush(&mut self) -> io::Result<()> {
while !self.raw_out_buffer.is_empty() {
let (front, _) = self.raw_out_buffer.as_slices();
debug_assert!(!front.is_empty());
let written = self.socket.sendmsg(front, &[])?;
self.raw_out_buffer.drain(..written);
}
while let Some(msg) = self.msg_out_buffer.front() {
let mut data = msg.as_bytes();
let fds = msg.fds();
let written = self.socket.sendmsg(data, &fds)?;
let msg = self.msg_out_buffer.pop_front().unwrap();
data = &msg.as_bytes()[written..];
while !data.is_empty() {
match self.socket.sendmsg(data, &[]) {
Ok(n) => data = &data[n..],
Err(e) => {
self.raw_out_buffer.extend(data);
return Err(e);
}
}
}
}
Ok(())
}
pub fn enqueue_message(&mut self, msg: Message) {
self.msg_out_buffer.push_back(msg);
}
pub fn try_receive_message(&mut self) -> crate::Result<Message> {
if self.msg_in_buffer.is_none() {
while self.raw_in_buffer.len() < MIN_MESSAGE_SIZE {
let current_bytes = self.raw_in_buffer.len();
let mut buf = vec![0; MIN_MESSAGE_SIZE - current_bytes];
let (read, fds) = self.socket.recvmsg(&mut buf)?;
self.raw_in_buffer.extend(&buf[..read]);
self.raw_in_fds.extend(fds);
}
self.msg_in_buffer = Some(Message::from_bytes(&self.raw_in_buffer)?);
self.raw_in_buffer.clear();
}
{
let msg = self.msg_in_buffer.as_mut().unwrap();
loop {
match msg.bytes_to_completion() {
Ok(0) => {
break;
}
Ok(needed) => {
let mut buf = vec![0; needed];
let (read, fds) = self.socket.recvmsg(&mut buf)?;
msg.add_bytes(&buf[..read])?;
self.raw_in_fds.extend(fds);
}
Err(e) => {
return Err(e.into());
}
}
}
}
let mut msg = self.msg_in_buffer.take().unwrap();
msg.set_owned_fds(std::mem::replace(&mut self.raw_in_fds, vec![]));
Ok(msg)
}
pub fn socket(&self) -> &S {
&self.socket
}
}
#[cfg(test)]
mod tests {
use super::Connection;
use crate::message::Message;
use std::os::unix::net::UnixStream;
#[test]
fn raw_send_receive() {
let (p0, p1) = UnixStream::pair().unwrap();
let mut conn0 = Connection::wrap(p0);
let mut conn1 = Connection::wrap(p1);
let msg = Message::method(None, None, "/", Some("org.zbus.p2p"), "Test", &()).unwrap();
conn0.enqueue_message(msg);
conn0.try_flush().unwrap();
let ret = conn1.try_receive_message().unwrap();
assert_eq!(ret.to_string(), "Method call Test");
}
}