1use core::ops::{Deref, DerefMut};
2use std::collections::{HashMap, VecDeque};
3use std::ops::ControlFlow;
4
5use libredox::error::Error as LError;
6
7use syscall::error::{self as errno, ECANCELED, EIO, EOPNOTSUPP};
8use syscall::{Error, Result};
9
10use crate::scheme::{Op, SchemeResponse, SchemeSync};
11use crate::{CallerCtx, Id, Request, RequestKind, Response, SignalBehavior, Socket};
12
13pub struct ReadinessBased<'sock> {
14 requests_read: Vec<Request>,
16 responses_to_write: VecDeque<Response>,
17
18 states: HashMap<Id, (CallerCtx, Op)>,
19 ready_queue: VecDeque<Id>,
20
21 socket: &'sock Socket,
22}
23impl<'sock> ReadinessBased<'sock> {
24 pub fn new(socket: &'sock Socket, queue_size: usize) -> Self {
25 Self {
26 requests_read: Vec::with_capacity(queue_size),
27 responses_to_write: VecDeque::with_capacity(queue_size),
28 states: HashMap::new(),
29 socket,
30 ready_queue: VecDeque::new(),
31 }
32 }
33 pub fn read_requests(&mut self) -> Result<bool> {
34 assert!(self.requests_read.is_empty());
35
36 match self
37 .socket
38 .read_requests(&mut self.requests_read, SignalBehavior::Interrupt)
39 {
40 Ok(()) if self.requests_read.is_empty() => Ok(false), Ok(())
43 | Err(Error {
44 errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
45 }) => Ok(true),
46 Err(err) => return Err(err),
47 }
48 }
49 pub fn process_requests<Guard, S: SchemeSync>(
50 &mut self,
51 mut acquire_scheme: impl FnMut() -> Guard,
53 ) where
54 Guard: Deref<Target = S> + DerefMut,
55 {
56 for request in self.requests_read.drain(..) {
57 let req = match request.kind() {
58 RequestKind::Call(c) => c,
59 RequestKind::Cancellation(req) => {
60 if let Some((_caller, op)) = self.states.remove(&req.id) {
61 self.responses_to_write
62 .push_back(Response::err(ECANCELED, op));
63 }
64 continue;
65 }
66 RequestKind::OnClose { id } => {
67 acquire_scheme().on_close(id);
68 continue;
69 }
70 RequestKind::SendFd(sendfd_request) => {
71 let result: Result<usize, Error> = acquire_scheme().on_sendfd(&sendfd_request);
72 let response = Response::new(result, sendfd_request);
73 self.responses_to_write.push_back(response);
74 continue;
75 }
76 _ => continue,
77 };
78 let caller = req.caller();
79 let mut op = match req.op() {
80 Ok(op) => op,
81 Err(req) => {
82 self.responses_to_write
83 .push_back(Response::err(EOPNOTSUPP, req));
84 continue;
85 }
86 };
87 let resp = match op.handle_sync_dont_consume(&caller, &mut *acquire_scheme()) {
88 SchemeResponse::Opened(Err(Error {
89 errno: errno::EWOULDBLOCK | errno::EAGAIN,
90 }))
91 | SchemeResponse::Regular(Err(Error {
92 errno: errno::EWOULDBLOCK | errno::EAGAIN,
93 })) if !op.is_explicitly_nonblock() => {
94 self.states.insert(caller.id, (caller, op));
95 continue;
96 }
97 SchemeResponse::Regular(r) => Response::new(r, op),
98 SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
99 };
100 self.responses_to_write.push_back(resp);
101 }
102 }
103 pub fn poll_request(&mut self, id: Id, scheme: &mut impl SchemeSync) -> Result<bool> {
106 Ok(
107 match Self::poll_request_inner(id, scheme, &mut self.states)? {
108 ControlFlow::Continue((caller, op)) => {
109 self.states.insert(id, (caller, op));
110 false
111 }
112 ControlFlow::Break(resp) => {
113 self.responses_to_write.push_back(resp);
114 true
115 }
116 },
117 )
118 }
119 fn poll_request_inner(
120 id: Id,
121 scheme: &mut impl SchemeSync,
122 states: &mut HashMap<Id, (CallerCtx, Op)>,
123 ) -> Result<ControlFlow<Response, (CallerCtx, Op)>> {
124 let (caller, mut op) = states.remove(&id).ok_or(Error::new(EIO))?;
125 let resp = match op.handle_sync_dont_consume(&caller, scheme) {
126 SchemeResponse::Opened(Err(Error {
127 errno: errno::EWOULDBLOCK | errno::EAGAIN,
128 }))
129 | SchemeResponse::Regular(Err(Error {
130 errno: errno::EWOULDBLOCK | errno::EAGAIN,
131 })) if !op.is_explicitly_nonblock() => {
132 return Ok(ControlFlow::Continue((caller, op)));
133 }
134 SchemeResponse::Regular(r) => Response::new(r, op),
135 SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
136 };
137 Ok(ControlFlow::Break(resp))
138 }
139 pub fn poll_ready_requests<S, G>(&mut self, mut acquire_scheme: impl FnMut() -> G) -> Result<()>
140 where
141 S: SchemeSync,
142 G: Deref<Target = S> + DerefMut,
143 {
144 for id in self.ready_queue.drain(..) {
145 match Self::poll_request_inner(id, &mut *acquire_scheme(), &mut self.states)? {
146 ControlFlow::Break(resp) => {
147 self.responses_to_write.push_back(resp);
148 }
149 ControlFlow::Continue((caller, op)) => {
150 self.states.insert(id, (caller, op));
151 }
152 }
153 }
154 Ok(())
155 }
156 pub fn poll_all_requests<S, G>(&mut self, acquire_scheme: impl FnMut() -> G) -> Result<()>
157 where
158 S: SchemeSync,
159 G: Deref<Target = S> + DerefMut,
160 {
161 self.ready_queue.clear();
163 self.ready_queue.extend(self.states.keys().copied());
164 self.poll_ready_requests(acquire_scheme)
165 }
166 pub fn write_responses(&mut self) -> Result<bool> {
167 match self
168 .socket
169 .write_responses(&mut self.responses_to_write, SignalBehavior::Restart)
170 {
171 Ok(())
174 | Err(Error {
175 errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
176 }) => Ok(true),
177 Err(err) => return Err(LError::from(err).into()),
178 }
179 }
180}