use core::ops::{Deref, DerefMut};
use std::collections::{HashMap, VecDeque};
use std::ops::ControlFlow;
use libredox::error::Error as LError;
use syscall::error::{self as errno, Error, ECANCELED, EIO, EOPNOTSUPP};
use syscall::Result;
use crate::scheme::{Op, SchemeResponse, SchemeSync};
use crate::{CallerCtx, Id, Request, RequestKind, Response, SignalBehavior, Socket};
pub struct ReadinessBased<'sock> {
requests_read: Vec<Request>,
responses_to_write: VecDeque<Response>,
states: HashMap<Id, (CallerCtx, Op)>,
ready_queue: VecDeque<Id>,
socket: &'sock Socket,
}
impl<'sock> ReadinessBased<'sock> {
pub fn new(socket: &'sock Socket, queue_size: usize) -> Self {
Self {
requests_read: Vec::with_capacity(queue_size),
responses_to_write: VecDeque::with_capacity(queue_size),
states: HashMap::new(),
socket,
ready_queue: VecDeque::new(),
}
}
pub fn read_requests(&mut self) -> Result<bool> {
assert!(self.requests_read.is_empty());
match self
.socket
.read_requests(&mut self.requests_read, SignalBehavior::Interrupt)
{
Ok(()) if self.requests_read.is_empty() => Ok(false), Ok(())
| Err(Error {
errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
}) => Ok(true),
Err(err) => return Err(err),
}
}
pub fn process_requests<Guard, S: SchemeSync>(
&mut self,
mut acquire_scheme: impl FnMut() -> Guard,
) where
Guard: Deref<Target = S> + DerefMut,
{
for request in self.requests_read.drain(..) {
let req = match request.kind() {
RequestKind::Call(c) => c,
RequestKind::Cancellation(req) => {
if let Some((_caller, op)) = self.states.remove(&req.id) {
self.responses_to_write
.push_back(Response::err(ECANCELED, op));
}
continue;
}
RequestKind::OnClose { id } => {
acquire_scheme().on_close(id);
continue;
}
RequestKind::SendFd(sendfd_request) => {
let result = acquire_scheme().on_sendfd(&sendfd_request);
let response = Response::new(result, sendfd_request);
self.responses_to_write.push_back(response);
continue;
}
RequestKind::RecvFd(recvfd_request) => {
let result = acquire_scheme().on_recvfd(&recvfd_request);
let caller = recvfd_request.caller();
if let Err(Error {
errno: errno::EWOULDBLOCK,
}) = result
{
self.states.insert(caller.id, (caller, recvfd_request.op()));
continue;
}
let response = Response::open_dup_like(result, recvfd_request);
self.responses_to_write.push_back(response);
continue;
}
_ => continue,
};
let caller = req.caller();
let mut op = match req.op() {
Ok(op) => op,
Err(req) => {
self.responses_to_write
.push_back(Response::err(EOPNOTSUPP, req));
continue;
}
};
let resp = match op.handle_sync_dont_consume(&caller, &mut *acquire_scheme()) {
SchemeResponse::Opened(Err(Error {
errno: errno::EWOULDBLOCK,
}))
| SchemeResponse::Regular(Err(Error {
errno: errno::EWOULDBLOCK,
})) if !op.is_explicitly_nonblock() => {
self.states.insert(caller.id, (caller, op));
continue;
}
SchemeResponse::Regular(r) => Response::new(r, op),
SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
};
self.responses_to_write.push_back(resp);
}
}
pub fn poll_request(&mut self, id: Id, scheme: &mut impl SchemeSync) -> Result<bool> {
Ok(
match Self::poll_request_inner(id, scheme, &mut self.states)? {
ControlFlow::Continue((caller, op)) => {
self.states.insert(id, (caller, op));
false
}
ControlFlow::Break(resp) => {
self.responses_to_write.push_back(resp);
true
}
},
)
}
fn poll_request_inner(
id: Id,
scheme: &mut impl SchemeSync,
states: &mut HashMap<Id, (CallerCtx, Op)>,
) -> Result<ControlFlow<Response, (CallerCtx, Op)>> {
let (caller, mut op) = states.remove(&id).ok_or(Error::new(EIO))?;
let resp = match op.handle_sync_dont_consume(&caller, scheme) {
SchemeResponse::Opened(Err(Error {
errno: errno::EWOULDBLOCK,
}))
| SchemeResponse::Regular(Err(Error {
errno: errno::EWOULDBLOCK,
})) if !op.is_explicitly_nonblock() => {
return Ok(ControlFlow::Continue((caller, op)));
}
SchemeResponse::Regular(r) => Response::new(r, op),
SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
};
Ok(ControlFlow::Break(resp))
}
pub fn poll_ready_requests<S, G>(&mut self, mut acquire_scheme: impl FnMut() -> G) -> Result<()>
where
S: SchemeSync,
G: Deref<Target = S> + DerefMut,
{
for id in self.ready_queue.drain(..) {
match Self::poll_request_inner(id, &mut *acquire_scheme(), &mut self.states)? {
ControlFlow::Break(resp) => {
self.responses_to_write.push_back(resp);
}
ControlFlow::Continue((caller, op)) => {
self.states.insert(id, (caller, op));
}
}
}
Ok(())
}
pub fn poll_all_requests<S, G>(&mut self, acquire_scheme: impl FnMut() -> G) -> Result<()>
where
S: SchemeSync,
G: Deref<Target = S> + DerefMut,
{
self.ready_queue.clear();
self.ready_queue.extend(self.states.keys().copied());
self.poll_ready_requests(acquire_scheme)
}
pub fn write_responses(&mut self) -> Result<bool> {
match self
.socket
.write_responses(&mut self.responses_to_write, SignalBehavior::Restart)
{
Ok(())
| Err(Error {
errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
}) => Ok(true),
Err(err) => return Err(LError::from(err).into()),
}
}
}