redox_scheme/
wrappers.rs

1use core::ops::{Deref, DerefMut};
2use std::collections::{HashMap, VecDeque};
3use std::ops::ControlFlow;
4
5use libredox::error::Error as LError;
6
7use syscall::error::{self as errno, Error, ECANCELED, EIO, EOPNOTSUPP};
8use syscall::Result;
9
10use crate::scheme::{Op, SchemeResponse, SchemeSync};
11use crate::{CallerCtx, Id, Request, RequestKind, Response, SignalBehavior, Socket};
12
13pub struct ReadinessBased<'sock> {
14    // TODO: VecDeque for both when it implements spare_capacity
15    requests_read: Vec<Request>,
16    responses_to_write: VecDeque<Response>,
17
18    states: HashMap<Id, (CallerCtx, Op)>,
19    ready_queue: VecDeque<Id>,
20
21    socket: &'sock Socket,
22}
23impl<'sock> ReadinessBased<'sock> {
24    pub fn new(socket: &'sock Socket, queue_size: usize) -> Self {
25        Self {
26            requests_read: Vec::with_capacity(queue_size),
27            responses_to_write: VecDeque::with_capacity(queue_size),
28            states: HashMap::new(),
29            socket,
30            ready_queue: VecDeque::new(),
31        }
32    }
33    pub fn read_requests(&mut self) -> Result<bool> {
34        assert!(self.requests_read.is_empty());
35
36        match self
37            .socket
38            .read_requests(&mut self.requests_read, SignalBehavior::Interrupt)
39        {
40            // TODO: write ECANCELED or similar for everything in self.states?
41            Ok(()) if self.requests_read.is_empty() => Ok(false), // EOF
42            Ok(())
43            | Err(Error {
44                errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
45            }) => Ok(true),
46            Err(err) => return Err(err),
47        }
48    }
49    pub fn process_requests<Guard, S: SchemeSync>(
50        &mut self,
51        // TODO: Maybe change this to &mut SchemeSync, requires converting e.g. audiod
52        mut acquire_scheme: impl FnMut() -> Guard,
53    ) where
54        Guard: Deref<Target = S> + DerefMut,
55    {
56        for request in self.requests_read.drain(..) {
57            let req = match request.kind() {
58                RequestKind::Call(c) => c,
59                RequestKind::Cancellation(req) => {
60                    if let Some((_caller, op)) = self.states.remove(&req.id) {
61                        self.responses_to_write
62                            .push_back(Response::err(ECANCELED, op));
63                    }
64                    continue;
65                }
66                RequestKind::OnClose { id } => {
67                    acquire_scheme().on_close(id);
68                    continue;
69                }
70                RequestKind::SendFd(sendfd_request) => {
71                    let result = acquire_scheme().on_sendfd(&sendfd_request);
72                    let response = Response::new(result, sendfd_request);
73                    self.responses_to_write.push_back(response);
74                    continue;
75                }
76                RequestKind::RecvFd(recvfd_request) => {
77                    let result = acquire_scheme().on_recvfd(&recvfd_request);
78                    let caller = recvfd_request.caller();
79
80                    if let Err(Error {
81                        errno: errno::EWOULDBLOCK,
82                    }) = result
83                    {
84                        self.states.insert(caller.id, (caller, recvfd_request.op()));
85                        continue;
86                    }
87                    let response = Response::open_dup_like(result, recvfd_request);
88                    self.responses_to_write.push_back(response);
89                    continue;
90                }
91                _ => continue,
92            };
93            let caller = req.caller();
94            let mut op = match req.op() {
95                Ok(op) => op,
96                Err(req) => {
97                    self.responses_to_write
98                        .push_back(Response::err(EOPNOTSUPP, req));
99                    continue;
100                }
101            };
102            let resp = match op.handle_sync_dont_consume(&caller, &mut *acquire_scheme()) {
103                SchemeResponse::Opened(Err(Error {
104                    errno: errno::EWOULDBLOCK,
105                }))
106                | SchemeResponse::Regular(Err(Error {
107                    errno: errno::EWOULDBLOCK,
108                })) if !op.is_explicitly_nonblock() => {
109                    self.states.insert(caller.id, (caller, op));
110                    continue;
111                }
112                SchemeResponse::Regular(r) => Response::new(r, op),
113                SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
114            };
115            self.responses_to_write.push_back(resp);
116        }
117    }
118    // TODO: Doesn't scale. Instead, provide an API for some form of queue.
119    // TODO: panic if id isn't present?
120    pub fn poll_request(&mut self, id: Id, scheme: &mut impl SchemeSync) -> Result<bool> {
121        Ok(
122            match Self::poll_request_inner(id, scheme, &mut self.states)? {
123                ControlFlow::Continue((caller, op)) => {
124                    self.states.insert(id, (caller, op));
125                    false
126                }
127                ControlFlow::Break(resp) => {
128                    self.responses_to_write.push_back(resp);
129                    true
130                }
131            },
132        )
133    }
134    fn poll_request_inner(
135        id: Id,
136        scheme: &mut impl SchemeSync,
137        states: &mut HashMap<Id, (CallerCtx, Op)>,
138    ) -> Result<ControlFlow<Response, (CallerCtx, Op)>> {
139        let (caller, mut op) = states.remove(&id).ok_or(Error::new(EIO))?;
140        let resp = match op.handle_sync_dont_consume(&caller, scheme) {
141            SchemeResponse::Opened(Err(Error {
142                errno: errno::EWOULDBLOCK,
143            }))
144            | SchemeResponse::Regular(Err(Error {
145                errno: errno::EWOULDBLOCK,
146            })) if !op.is_explicitly_nonblock() => {
147                return Ok(ControlFlow::Continue((caller, op)));
148            }
149            SchemeResponse::Regular(r) => Response::new(r, op),
150            SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
151        };
152        Ok(ControlFlow::Break(resp))
153    }
154    pub fn poll_ready_requests<S, G>(&mut self, mut acquire_scheme: impl FnMut() -> G) -> Result<()>
155    where
156        S: SchemeSync,
157        G: Deref<Target = S> + DerefMut,
158    {
159        for id in self.ready_queue.drain(..) {
160            match Self::poll_request_inner(id, &mut *acquire_scheme(), &mut self.states)? {
161                ControlFlow::Break(resp) => {
162                    self.responses_to_write.push_back(resp);
163                }
164                ControlFlow::Continue((caller, op)) => {
165                    self.states.insert(id, (caller, op));
166                }
167            }
168        }
169        Ok(())
170    }
171    pub fn poll_all_requests<S, G>(&mut self, acquire_scheme: impl FnMut() -> G) -> Result<()>
172    where
173        S: SchemeSync,
174        G: Deref<Target = S> + DerefMut,
175    {
176        // TODO: implement waker-like API
177        self.ready_queue.clear();
178        self.ready_queue.extend(self.states.keys().copied());
179        self.poll_ready_requests(acquire_scheme)
180    }
181    pub fn write_responses(&mut self) -> Result<bool> {
182        match self
183            .socket
184            .write_responses(&mut self.responses_to_write, SignalBehavior::Restart)
185        {
186            // TODO: write ECANCELED or similar for everything in self.states?
187            //Ok(()) if !self.responses_to_write.is_empty() => Ok(false), // EOF, FIXME
188            Ok(())
189            | Err(Error {
190                errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
191            }) => Ok(true),
192            Err(err) => return Err(LError::from(err).into()),
193        }
194    }
195}