1use core::ops::{Deref, DerefMut};
2use std::collections::{HashMap, VecDeque};
3use std::ops::ControlFlow;
4
5use libredox::error::Error as LError;
6
7use syscall::error::{self as errno, ECANCELED, EIO, EOPNOTSUPP};
8use syscall::{Error, Result};
9
10use crate::scheme::{Op, SchemeResponse, SchemeSync};
11use crate::{CallerCtx, Id, Request, RequestKind, Response, SignalBehavior, Socket};
12
13pub struct ReadinessBased<'sock> {
14 requests_read: Vec<Request>,
16 responses_to_write: VecDeque<Response>,
17
18 states: HashMap<Id, (CallerCtx, Op)>,
19 ready_queue: VecDeque<Id>,
20
21 socket: &'sock Socket,
22}
23impl<'sock> ReadinessBased<'sock> {
24 pub fn new(socket: &'sock Socket, queue_size: usize) -> Self {
25 Self {
26 requests_read: Vec::with_capacity(queue_size),
27 responses_to_write: VecDeque::with_capacity(queue_size),
28 states: HashMap::new(),
29 socket,
30 ready_queue: VecDeque::new(),
31 }
32 }
33 pub fn read_requests(&mut self) -> Result<bool> {
34 assert!(self.requests_read.is_empty());
35
36 match self
37 .socket
38 .read_requests(&mut self.requests_read, SignalBehavior::Interrupt)
39 {
40 Ok(()) if self.requests_read.is_empty() => Ok(false), Ok(())
43 | Err(Error {
44 errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
45 }) => Ok(true),
46 Err(err) => return Err(err),
47 }
48 }
49 pub fn process_requests<Guard, S: SchemeSync>(
50 &mut self,
51 mut acquire_scheme: impl FnMut() -> Guard,
53 ) where
54 Guard: Deref<Target = S> + DerefMut,
55 {
56 for request in self.requests_read.drain(..) {
57 let req = match request.kind() {
58 RequestKind::Call(c) => c,
59 RequestKind::Cancellation(req) => {
60 if let Some((_caller, op)) = self.states.remove(&req.id) {
61 self.responses_to_write
62 .push_back(Response::err(ECANCELED, op));
63 }
64 continue;
65 }
66 RequestKind::OnClose { id } => {
67 acquire_scheme().on_close(id);
68 continue;
69 }
70 _ => continue,
71 };
72 let caller = req.caller();
73 let mut op = match req.op() {
74 Ok(op) => op,
75 Err(req) => {
76 self.responses_to_write
77 .push_back(Response::err(EOPNOTSUPP, req));
78 continue;
79 }
80 };
81 let resp = match op.handle_sync_dont_consume(&caller, &mut *acquire_scheme()) {
82 SchemeResponse::Opened(Err(Error {
83 errno: errno::EWOULDBLOCK | errno::EAGAIN,
84 }))
85 | SchemeResponse::Regular(Err(Error {
86 errno: errno::EWOULDBLOCK | errno::EAGAIN,
87 })) if !op.is_explicitly_nonblock() => {
88 self.states.insert(caller.id, (caller, op));
89 continue;
90 }
91 SchemeResponse::Regular(r) => Response::new(r, op),
92 SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
93 };
94 self.responses_to_write.push_back(resp);
95 }
96 }
97 pub fn poll_request(&mut self, id: Id, scheme: &mut impl SchemeSync) -> Result<bool> {
100 Ok(
101 match Self::poll_request_inner(id, scheme, &mut self.states)? {
102 ControlFlow::Continue((caller, op)) => {
103 self.states.insert(id, (caller, op));
104 false
105 }
106 ControlFlow::Break(resp) => {
107 self.responses_to_write.push_back(resp);
108 true
109 }
110 },
111 )
112 }
113 fn poll_request_inner(
114 id: Id,
115 scheme: &mut impl SchemeSync,
116 states: &mut HashMap<Id, (CallerCtx, Op)>,
117 ) -> Result<ControlFlow<Response, (CallerCtx, Op)>> {
118 let (caller, mut op) = states.remove(&id).ok_or(Error::new(EIO))?;
119 let resp = match op.handle_sync_dont_consume(&caller, scheme) {
120 SchemeResponse::Opened(Err(Error {
121 errno: errno::EWOULDBLOCK | errno::EAGAIN,
122 }))
123 | SchemeResponse::Regular(Err(Error {
124 errno: errno::EWOULDBLOCK | errno::EAGAIN,
125 })) if !op.is_explicitly_nonblock() => {
126 return Ok(ControlFlow::Continue((caller, op)));
127 }
128 SchemeResponse::Regular(r) => Response::new(r, op),
129 SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
130 };
131 Ok(ControlFlow::Break(resp))
132 }
133 pub fn poll_ready_requests<S, G>(&mut self, mut acquire_scheme: impl FnMut() -> G) -> Result<()>
134 where
135 S: SchemeSync,
136 G: Deref<Target = S> + DerefMut,
137 {
138 for id in self.ready_queue.drain(..) {
139 match Self::poll_request_inner(id, &mut *acquire_scheme(), &mut self.states)? {
140 ControlFlow::Break(resp) => {
141 self.responses_to_write.push_back(resp);
142 }
143 ControlFlow::Continue((caller, op)) => {
144 self.states.insert(id, (caller, op));
145 }
146 }
147 }
148 Ok(())
149 }
150 pub fn poll_all_requests<S, G>(&mut self, acquire_scheme: impl FnMut() -> G) -> Result<()>
151 where
152 S: SchemeSync,
153 G: Deref<Target = S> + DerefMut,
154 {
155 self.ready_queue.clear();
157 self.ready_queue.extend(self.states.keys().copied());
158 self.poll_ready_requests(acquire_scheme)
159 }
160 pub fn write_responses(&mut self) -> Result<bool> {
161 match self
162 .socket
163 .write_responses(&mut self.responses_to_write, SignalBehavior::Restart)
164 {
165 Ok(())
168 | Err(Error {
169 errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
170 }) => Ok(true),
171 Err(err) => return Err(LError::from(err).into()),
172 }
173 }
174}