use nix::NixPath;
use nix::errno::Errno;
use nix::sys::socket::{
AddressFamily, Backlog, SockFlag, SockType, UnixAddr, accept, bind, connect, listen, socket,
};
use nix::unistd::unlink;
use std::os::fd::{BorrowedFd, OwnedFd, RawFd};
use std::os::unix::io::AsRawFd;
use crate::VectorConfig;
use crate::channel::ChannelVector;
use crate::error::*;
use crate::protocol::{create_request, create_response, parse_request, parse_response};
use crate::resource::VectorResource;
use crate::unix::{UnixMessageRx, UnixMessageTx};
pub struct Server {
sockfd: OwnedFd,
addr: UnixAddr,
}
impl Server {
pub fn new<P: ?Sized + NixPath>(path: &P, backlog: Backlog) -> Result<Self, Errno> {
let addr = UnixAddr::new(path)?;
let sockfd = socket(
AddressFamily::Unix,
SockType::SeqPacket,
SockFlag::empty(),
None,
)?;
bind(sockfd.as_raw_fd(), &addr)?;
listen(&sockfd, backlog)?;
Ok(Self { sockfd, addr })
}
fn handle_request<F>(socket: RawFd, filter: F) -> Result<ChannelVector, TransferError>
where
F: Fn(&VectorResource) -> bool,
{
let mut req = UnixMessageRx::receive(socket.as_raw_fd())?;
let mut fds = req.take_fds();
let vconfig = parse_request(req.content())?;
let shmfd = fds
.pop_front()
.ok_or(TransferError::MissingFileDescriptor)?;
let n_consumer_eventfds = vconfig.count_consumer_eventfds();
let producer_eventfds = fds.split_off(n_consumer_eventfds);
let rsc = VectorResource::new(&vconfig, shmfd, fds, producer_eventfds)?;
if !filter(&rsc) {
return Err(TransferError::Rejected);
}
let vec = ChannelVector::new(rsc)?;
Ok(vec)
}
pub fn conditional_accept<F>(&self, filter: F) -> Result<ChannelVector, TransferError>
where
F: Fn(&VectorResource) -> bool,
{
let socket = accept(self.sockfd.as_raw_fd())?;
let result = Self::handle_request(socket, filter);
let response_msg = create_response(result.is_ok());
let response = UnixMessageTx::new(response_msg, Vec::with_capacity(0));
response.send(socket)?;
result
}
pub fn accept(&self) -> Result<ChannelVector, TransferError> {
self.conditional_accept(|_| true)
}
}
pub fn client_connect_fd(
socket: RawFd,
vconfig: VectorConfig,
) -> Result<ChannelVector, TransferError> {
let rsc = VectorResource::allocate(&vconfig)?;
let req_msg = create_request(&vconfig);
let mut producer_fds = rsc.collect_producer_eventfds();
let mut consumer_fds = rsc.collect_consumer_eventfds();
let mut fds = Vec::<BorrowedFd<'_>>::new();
fds.push(rsc.shmfd());
fds.append(&mut producer_fds);
fds.append(&mut consumer_fds);
let req = UnixMessageTx::new(req_msg, fds);
req.send(socket)?;
let response = UnixMessageRx::receive(socket.as_raw_fd())?;
parse_response(response.content().as_slice())?;
let vec = ChannelVector::new(rsc)?;
Ok(vec)
}
pub fn client_connect<P: ?Sized + NixPath>(
path: &P,
vconfig: VectorConfig,
) -> Result<ChannelVector, TransferError> {
let socket = socket(
AddressFamily::Unix,
SockType::SeqPacket,
SockFlag::empty(),
None,
)?;
let addr = UnixAddr::new(path)?;
connect(socket.as_raw_fd(), &addr)?;
let rsc = VectorResource::allocate(&vconfig)?;
let req_msg = create_request(&vconfig);
let mut producer_fds = rsc.collect_producer_eventfds();
let mut consumer_fds = rsc.collect_consumer_eventfds();
let mut fds = Vec::<BorrowedFd<'_>>::new();
fds.push(rsc.shmfd());
fds.append(&mut producer_fds);
fds.append(&mut consumer_fds);
let req = UnixMessageTx::new(req_msg, fds);
req.send(socket.as_raw_fd())?;
let response = UnixMessageRx::receive(socket.as_raw_fd())?;
parse_response(response.content().as_slice())?;
let vec = ChannelVector::new(rsc)?;
Ok(vec)
}
impl Drop for Server {
fn drop(&mut self) {
if let Some(path) = self.addr.path() {
let _ = unlink(path);
}
}
}