use bytes::{Buf, BytesMut};
use dbus_message_parser::{decode::DecodeError, message::Message};
use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
stream::StreamExt,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub async fn message_sink<T>(mut message_receiver: UnboundedReceiver<Message>, mut sink: T)
where
T: AsyncWriteExt + Unpin,
{
while let Some(msg) = message_receiver.next().await {
let mut buffer = match msg.encode() {
Ok(buffer) => buffer,
Err(e) => {
error!("message_sink: {:?}", e);
return;
}
};
while !buffer.is_empty() {
match sink.write(buffer.as_mut()).await {
Ok(size) => {
buffer.advance(size);
}
Err(e) => {
error!("message_sink: {:?}", e);
return;
}
}
}
}
}
pub async fn message_stream<T>(mut stream: T, message_sink: UnboundedSender<Message>)
where
T: AsyncReadExt + Unpin,
{
let mut buffer_msg = BytesMut::new();
let mut buffer: [u8; 128] = [0; 128];
loop {
match stream.read(&mut buffer[..]).await {
Ok(size) => {
buffer_msg.extend_from_slice(&buffer[..size]);
if size == 0 {
error!("message_stream: size == 0");
return;
}
}
Err(e) => {
error!("message_stream: {:?}", e);
return;
}
}
loop {
let bytes = buffer_msg.clone().freeze();
let result = Message::decode(bytes);
match result {
Ok((msg, offset)) => {
buffer_msg.advance(offset);
if let Err(e) = message_sink.unbounded_send(msg) {
error!("message_stream: {}", e);
return;
}
if buffer_msg.is_empty() {
buffer_msg = BytesMut::new();
break;
}
}
Err(DecodeError::NotEnoughBytes(u1, u2)) => {
debug!(
"message_stream: DecodeError::NotEnoughBytes({}, {})",
u1, u2
);
break;
}
Err(e) => {
error!("message_stream: {:?}", e);
return;
}
}
}
}
}