redox_scheme/
wrappers.rs

1use core::ops::{Deref, DerefMut};
2use std::collections::{HashMap, VecDeque};
3use std::ops::ControlFlow;
4
5use libredox::error::Error as LError;
6
7use syscall::error::{self as errno, ECANCELED, EIO, EOPNOTSUPP};
8use syscall::{Error, Result};
9
10use crate::scheme::{Op, SchemeResponse, SchemeSync};
11use crate::{CallerCtx, Id, Request, RequestKind, Response, SignalBehavior, Socket};
12
13pub struct ReadinessBased<'sock> {
14    // TODO: VecDeque for both when it implements spare_capacity
15    requests_read: Vec<Request>,
16    responses_to_write: VecDeque<Response>,
17
18    states: HashMap<Id, (CallerCtx, Op)>,
19    ready_queue: VecDeque<Id>,
20
21    socket: &'sock Socket,
22}
23impl<'sock> ReadinessBased<'sock> {
24    pub fn new(socket: &'sock Socket, queue_size: usize) -> Self {
25        Self {
26            requests_read: Vec::with_capacity(queue_size),
27            responses_to_write: VecDeque::with_capacity(queue_size),
28            states: HashMap::new(),
29            socket,
30            ready_queue: VecDeque::new(),
31        }
32    }
33    pub fn read_requests(&mut self) -> Result<bool> {
34        assert!(self.requests_read.is_empty());
35
36        match self
37            .socket
38            .read_requests(&mut self.requests_read, SignalBehavior::Interrupt)
39        {
40            // TODO: write ECANCELED or similar for everything in self.states?
41            Ok(()) if self.requests_read.is_empty() => Ok(false), // EOF
42            Ok(())
43            | Err(Error {
44                errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
45            }) => Ok(true),
46            Err(err) => return Err(err),
47        }
48    }
49    pub fn process_requests<Guard, S: SchemeSync>(
50        &mut self,
51        // TODO: Maybe change this to &mut SchemeSync, requires converting e.g. audiod
52        mut acquire_scheme: impl FnMut() -> Guard,
53    ) where
54        Guard: Deref<Target = S> + DerefMut,
55    {
56        for request in self.requests_read.drain(..) {
57            let req = match request.kind() {
58                RequestKind::Call(c) => c,
59                RequestKind::Cancellation(req) => {
60                    if let Some((_caller, op)) = self.states.remove(&req.id) {
61                        self.responses_to_write
62                            .push_back(Response::err(ECANCELED, op));
63                    }
64                    continue;
65                }
66                RequestKind::OnClose { id } => {
67                    acquire_scheme().on_close(id);
68                    continue;
69                }
70                RequestKind::SendFd(sendfd_request) => {
71                    let result: Result<usize, Error> = acquire_scheme().on_sendfd(&sendfd_request);
72                    let response = Response::new(result, sendfd_request);
73                    self.responses_to_write.push_back(response);
74                    continue;
75                }
76                _ => continue,
77            };
78            let caller = req.caller();
79            let mut op = match req.op() {
80                Ok(op) => op,
81                Err(req) => {
82                    self.responses_to_write
83                        .push_back(Response::err(EOPNOTSUPP, req));
84                    continue;
85                }
86            };
87            let resp = match op.handle_sync_dont_consume(&caller, &mut *acquire_scheme()) {
88                SchemeResponse::Opened(Err(Error {
89                    errno: errno::EWOULDBLOCK | errno::EAGAIN,
90                }))
91                | SchemeResponse::Regular(Err(Error {
92                    errno: errno::EWOULDBLOCK | errno::EAGAIN,
93                })) if !op.is_explicitly_nonblock() => {
94                    self.states.insert(caller.id, (caller, op));
95                    continue;
96                }
97                SchemeResponse::Regular(r) => Response::new(r, op),
98                SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
99            };
100            self.responses_to_write.push_back(resp);
101        }
102    }
103    // TODO: Doesn't scale. Instead, provide an API for some form of queue.
104    // TODO: panic if id isn't present?
105    pub fn poll_request(&mut self, id: Id, scheme: &mut impl SchemeSync) -> Result<bool> {
106        Ok(
107            match Self::poll_request_inner(id, scheme, &mut self.states)? {
108                ControlFlow::Continue((caller, op)) => {
109                    self.states.insert(id, (caller, op));
110                    false
111                }
112                ControlFlow::Break(resp) => {
113                    self.responses_to_write.push_back(resp);
114                    true
115                }
116            },
117        )
118    }
119    fn poll_request_inner(
120        id: Id,
121        scheme: &mut impl SchemeSync,
122        states: &mut HashMap<Id, (CallerCtx, Op)>,
123    ) -> Result<ControlFlow<Response, (CallerCtx, Op)>> {
124        let (caller, mut op) = states.remove(&id).ok_or(Error::new(EIO))?;
125        let resp = match op.handle_sync_dont_consume(&caller, scheme) {
126            SchemeResponse::Opened(Err(Error {
127                errno: errno::EWOULDBLOCK | errno::EAGAIN,
128            }))
129            | SchemeResponse::Regular(Err(Error {
130                errno: errno::EWOULDBLOCK | errno::EAGAIN,
131            })) if !op.is_explicitly_nonblock() => {
132                return Ok(ControlFlow::Continue((caller, op)));
133            }
134            SchemeResponse::Regular(r) => Response::new(r, op),
135            SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
136        };
137        Ok(ControlFlow::Break(resp))
138    }
139    pub fn poll_ready_requests<S, G>(&mut self, mut acquire_scheme: impl FnMut() -> G) -> Result<()>
140    where
141        S: SchemeSync,
142        G: Deref<Target = S> + DerefMut,
143    {
144        for id in self.ready_queue.drain(..) {
145            match Self::poll_request_inner(id, &mut *acquire_scheme(), &mut self.states)? {
146                ControlFlow::Break(resp) => {
147                    self.responses_to_write.push_back(resp);
148                }
149                ControlFlow::Continue((caller, op)) => {
150                    self.states.insert(id, (caller, op));
151                }
152            }
153        }
154        Ok(())
155    }
156    pub fn poll_all_requests<S, G>(&mut self, acquire_scheme: impl FnMut() -> G) -> Result<()>
157    where
158        S: SchemeSync,
159        G: Deref<Target = S> + DerefMut,
160    {
161        // TODO: implement waker-like API
162        self.ready_queue.clear();
163        self.ready_queue.extend(self.states.keys().copied());
164        self.poll_ready_requests(acquire_scheme)
165    }
166    pub fn write_responses(&mut self) -> Result<bool> {
167        match self
168            .socket
169            .write_responses(&mut self.responses_to_write, SignalBehavior::Restart)
170        {
171            // TODO: write ECANCELED or similar for everything in self.states?
172            //Ok(()) if !self.responses_to_write.is_empty() => Ok(false), // EOF, FIXME
173            Ok(())
174            | Err(Error {
175                errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
176            }) => Ok(true),
177            Err(err) => return Err(LError::from(err).into()),
178        }
179    }
180}