use crate::error::{sock_error_msg, Error, Result};
use crate::proto::{check_oversize, MessageHeader, DEFAULT_PAGE_SIZE, MESSAGE_HEADER_LENGTH};
use crate::sync::sys::PipeConnection;
fn read_count(conn: &PipeConnection, count: usize) -> Result<Vec<u8>> {
let mut v: Vec<u8> = vec![0; count];
let mut len = 0;
if count == 0 {
return Ok(v.to_vec());
}
loop {
match conn.read(&mut v[len..]) {
Ok(l) => {
len += l;
if len == count || l == 0 {
break;
}
}
Err(e) => {
return Err(Error::Socket(e.to_string()));
}
}
}
Ok(v[0..len].to_vec())
}
fn write_count(conn: &PipeConnection, buf: &[u8], count: usize) -> Result<usize> {
let mut len = 0;
if count == 0 {
return Ok(0);
}
loop {
match conn.write(&buf[len..]) {
Ok(l) => {
len += l;
if len == count {
break;
}
}
Err(e) => {
return Err(Error::Socket(e.to_string()));
}
}
}
Ok(len)
}
fn discard_count(conn: &PipeConnection, count: usize) -> Result<()> {
let mut need_discard = count;
while need_discard > 0 {
let once_discard = std::cmp::min(DEFAULT_PAGE_SIZE, need_discard);
read_count(conn, once_discard)?;
need_discard -= once_discard;
}
Ok(())
}
fn read_message_header(conn: &PipeConnection) -> Result<MessageHeader> {
let buf = read_count(conn, MESSAGE_HEADER_LENGTH)?;
let size = buf.len();
if size != MESSAGE_HEADER_LENGTH {
return Err(sock_error_msg(
size,
format!("Message header length {size} is too small"),
));
}
let mh = MessageHeader::from(&buf);
Ok(mh)
}
pub fn read_message(conn: &PipeConnection) -> Result<(MessageHeader, Result<Vec<u8>>)> {
let mh = read_message_header(conn)?;
trace!("Got Message header {:?}", mh);
let mh_len = mh.length as usize;
if let Err(e) = check_oversize(mh_len, true) {
discard_count(conn, mh_len)?;
return Ok((mh, Err(e)));
}
let buf = read_count(conn, mh.length as usize)?;
let size = buf.len();
if size != mh.length as usize {
return Err(sock_error_msg(
size,
format!("Message length {} is not {}", size, mh.length),
));
}
trace!("Got Message body {:?}", buf);
Ok((mh, Ok(buf)))
}
fn write_message_header(conn: &PipeConnection, mh: MessageHeader) -> Result<()> {
let buf: Vec<u8> = mh.into();
let size = write_count(conn, &buf, MESSAGE_HEADER_LENGTH)?;
if size != MESSAGE_HEADER_LENGTH {
return Err(sock_error_msg(
size,
format!("Send Message header length size {size} is not right"),
));
}
Ok(())
}
pub fn write_message(conn: &PipeConnection, mh: MessageHeader, buf: Vec<u8>) -> Result<()> {
write_message_header(conn, mh)?;
let size = write_count(conn, &buf, buf.len())?;
if size != buf.len() {
return Err(sock_error_msg(
size,
format!("Send Message length size {size} is not right"),
));
}
Ok(())
}