1use core::ops::{Deref, DerefMut};
2use std::collections::{HashMap, VecDeque};
3use std::ops::ControlFlow;
4
5use libredox::error::Error as LError;
6
7use syscall::error::{self as errno, Error, ECANCELED, EIO, EOPNOTSUPP};
8use syscall::Result;
9
10use crate::scheme::{Op, SchemeResponse, SchemeSync};
11use crate::{CallerCtx, Id, Request, RequestKind, Response, SignalBehavior, Socket};
12
13pub struct ReadinessBased<'sock> {
14 requests_read: Vec<Request>,
16 responses_to_write: VecDeque<Response>,
17
18 states: HashMap<Id, (CallerCtx, Op)>,
19 ready_queue: VecDeque<Id>,
20
21 socket: &'sock Socket,
22}
23impl<'sock> ReadinessBased<'sock> {
24 pub fn new(socket: &'sock Socket, queue_size: usize) -> Self {
25 Self {
26 requests_read: Vec::with_capacity(queue_size),
27 responses_to_write: VecDeque::with_capacity(queue_size),
28 states: HashMap::new(),
29 socket,
30 ready_queue: VecDeque::new(),
31 }
32 }
33 pub fn read_requests(&mut self) -> Result<bool> {
34 assert!(self.requests_read.is_empty());
35
36 match self
37 .socket
38 .read_requests(&mut self.requests_read, SignalBehavior::Interrupt)
39 {
40 Ok(()) if self.requests_read.is_empty() => Ok(false), Ok(())
43 | Err(Error {
44 errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
45 }) => Ok(true),
46 Err(err) => return Err(err),
47 }
48 }
49 pub fn process_requests<Guard, S: SchemeSync>(
50 &mut self,
51 mut acquire_scheme: impl FnMut() -> Guard,
53 ) where
54 Guard: Deref<Target = S> + DerefMut,
55 {
56 for request in self.requests_read.drain(..) {
57 let req = match request.kind() {
58 RequestKind::Call(c) => c,
59 RequestKind::Cancellation(req) => {
60 if let Some((_caller, op)) = self.states.remove(&req.id) {
61 self.responses_to_write
62 .push_back(Response::err(ECANCELED, op));
63 }
64 continue;
65 }
66 RequestKind::OnClose { id } => {
67 acquire_scheme().on_close(id);
68 continue;
69 }
70 RequestKind::SendFd(sendfd_request) => {
71 let result = acquire_scheme().on_sendfd(&sendfd_request);
72 let response = Response::new(result, sendfd_request);
73 self.responses_to_write.push_back(response);
74 continue;
75 }
76 RequestKind::RecvFd(recvfd_request) => {
77 let result = acquire_scheme().on_recvfd(&recvfd_request);
78 let caller = recvfd_request.caller();
79
80 if let Err(Error {
81 errno: errno::EWOULDBLOCK,
82 }) = result
83 {
84 self.states.insert(caller.id, (caller, recvfd_request.op()));
85 continue;
86 }
87 let response = Response::open_dup_like(result, recvfd_request);
88 self.responses_to_write.push_back(response);
89 continue;
90 }
91 _ => continue,
92 };
93 let caller = req.caller();
94 let mut op = match req.op() {
95 Ok(op) => op,
96 Err(req) => {
97 self.responses_to_write
98 .push_back(Response::err(EOPNOTSUPP, req));
99 continue;
100 }
101 };
102 let resp = match op.handle_sync_dont_consume(&caller, &mut *acquire_scheme()) {
103 SchemeResponse::Opened(Err(Error {
104 errno: errno::EWOULDBLOCK,
105 }))
106 | SchemeResponse::Regular(Err(Error {
107 errno: errno::EWOULDBLOCK,
108 })) if !op.is_explicitly_nonblock() => {
109 self.states.insert(caller.id, (caller, op));
110 continue;
111 }
112 SchemeResponse::Regular(r) => Response::new(r, op),
113 SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
114 };
115 self.responses_to_write.push_back(resp);
116 }
117 }
118 pub fn poll_request(&mut self, id: Id, scheme: &mut impl SchemeSync) -> Result<bool> {
121 Ok(
122 match Self::poll_request_inner(id, scheme, &mut self.states)? {
123 ControlFlow::Continue((caller, op)) => {
124 self.states.insert(id, (caller, op));
125 false
126 }
127 ControlFlow::Break(resp) => {
128 self.responses_to_write.push_back(resp);
129 true
130 }
131 },
132 )
133 }
134 fn poll_request_inner(
135 id: Id,
136 scheme: &mut impl SchemeSync,
137 states: &mut HashMap<Id, (CallerCtx, Op)>,
138 ) -> Result<ControlFlow<Response, (CallerCtx, Op)>> {
139 let (caller, mut op) = states.remove(&id).ok_or(Error::new(EIO))?;
140 let resp = match op.handle_sync_dont_consume(&caller, scheme) {
141 SchemeResponse::Opened(Err(Error {
142 errno: errno::EWOULDBLOCK,
143 }))
144 | SchemeResponse::Regular(Err(Error {
145 errno: errno::EWOULDBLOCK,
146 })) if !op.is_explicitly_nonblock() => {
147 return Ok(ControlFlow::Continue((caller, op)));
148 }
149 SchemeResponse::Regular(r) => Response::new(r, op),
150 SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
151 };
152 Ok(ControlFlow::Break(resp))
153 }
154 pub fn poll_ready_requests<S, G>(&mut self, mut acquire_scheme: impl FnMut() -> G) -> Result<()>
155 where
156 S: SchemeSync,
157 G: Deref<Target = S> + DerefMut,
158 {
159 for id in self.ready_queue.drain(..) {
160 match Self::poll_request_inner(id, &mut *acquire_scheme(), &mut self.states)? {
161 ControlFlow::Break(resp) => {
162 self.responses_to_write.push_back(resp);
163 }
164 ControlFlow::Continue((caller, op)) => {
165 self.states.insert(id, (caller, op));
166 }
167 }
168 }
169 Ok(())
170 }
171 pub fn poll_all_requests<S, G>(&mut self, acquire_scheme: impl FnMut() -> G) -> Result<()>
172 where
173 S: SchemeSync,
174 G: Deref<Target = S> + DerefMut,
175 {
176 self.ready_queue.clear();
178 self.ready_queue.extend(self.states.keys().copied());
179 self.poll_ready_requests(acquire_scheme)
180 }
181 pub fn write_responses(&mut self) -> Result<bool> {
182 match self
183 .socket
184 .write_responses(&mut self.responses_to_write, SignalBehavior::Restart)
185 {
186 Ok(())
189 | Err(Error {
190 errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
191 }) => Ok(true),
192 Err(err) => return Err(LError::from(err).into()),
193 }
194 }
195}