redox_scheme/
wrappers.rs

1use core::ops::{Deref, DerefMut};
2use std::collections::{HashMap, VecDeque};
3use std::ops::ControlFlow;
4
5use libredox::error::Error as LError;
6
7use syscall::error::{self as errno, ECANCELED, EIO, EOPNOTSUPP};
8use syscall::{Error, Result};
9
10use crate::scheme::{Op, SchemeResponse, SchemeSync};
11use crate::{CallerCtx, Id, Request, RequestKind, Response, SignalBehavior, Socket};
12
13pub struct ReadinessBased<'sock> {
14    // TODO: VecDeque for both when it implements spare_capacity
15    requests_read: Vec<Request>,
16    responses_to_write: VecDeque<Response>,
17
18    states: HashMap<Id, (CallerCtx, Op)>,
19    ready_queue: VecDeque<Id>,
20
21    socket: &'sock Socket,
22}
23impl<'sock> ReadinessBased<'sock> {
24    pub fn new(socket: &'sock Socket, queue_size: usize) -> Self {
25        Self {
26            requests_read: Vec::with_capacity(queue_size),
27            responses_to_write: VecDeque::with_capacity(queue_size),
28            states: HashMap::new(),
29            socket,
30            ready_queue: VecDeque::new(),
31        }
32    }
33    pub fn read_requests(&mut self) -> Result<bool> {
34        assert!(self.requests_read.is_empty());
35
36        match self
37            .socket
38            .read_requests(&mut self.requests_read, SignalBehavior::Interrupt)
39        {
40            // TODO: write ECANCELED or similar for everything in self.states?
41            Ok(()) if self.requests_read.is_empty() => Ok(false), // EOF
42            Ok(())
43            | Err(Error {
44                errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
45            }) => Ok(true),
46            Err(err) => return Err(err),
47        }
48    }
49    pub fn process_requests<Guard, S: SchemeSync>(
50        &mut self,
51        // TODO: Maybe change this to &mut SchemeSync, requires converting e.g. audiod
52        mut acquire_scheme: impl FnMut() -> Guard,
53    ) where
54        Guard: Deref<Target = S> + DerefMut,
55    {
56        for request in self.requests_read.drain(..) {
57            let req = match request.kind() {
58                RequestKind::Call(c) => c,
59                RequestKind::Cancellation(req) => {
60                    if let Some((_caller, op)) = self.states.remove(&req.id) {
61                        self.responses_to_write
62                            .push_back(Response::err(ECANCELED, op));
63                    }
64                    continue;
65                }
66                RequestKind::OnClose { id } => {
67                    acquire_scheme().on_close(id);
68                    continue;
69                }
70                _ => continue,
71            };
72            let caller = req.caller();
73            let mut op = match req.op() {
74                Ok(op) => op,
75                Err(req) => {
76                    self.responses_to_write
77                        .push_back(Response::err(EOPNOTSUPP, req));
78                    continue;
79                }
80            };
81            let resp = match op.handle_sync_dont_consume(&caller, &mut *acquire_scheme()) {
82                SchemeResponse::Opened(Err(Error {
83                    errno: errno::EWOULDBLOCK | errno::EAGAIN,
84                }))
85                | SchemeResponse::Regular(Err(Error {
86                    errno: errno::EWOULDBLOCK | errno::EAGAIN,
87                })) if !op.is_explicitly_nonblock() => {
88                    self.states.insert(caller.id, (caller, op));
89                    continue;
90                }
91                SchemeResponse::Regular(r) => Response::new(r, op),
92                SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
93            };
94            self.responses_to_write.push_back(resp);
95        }
96    }
97    // TODO: Doesn't scale. Instead, provide an API for some form of queue.
98    // TODO: panic if id isn't present?
99    pub fn poll_request(&mut self, id: Id, scheme: &mut impl SchemeSync) -> Result<bool> {
100        Ok(
101            match Self::poll_request_inner(id, scheme, &mut self.states)? {
102                ControlFlow::Continue((caller, op)) => {
103                    self.states.insert(id, (caller, op));
104                    false
105                }
106                ControlFlow::Break(resp) => {
107                    self.responses_to_write.push_back(resp);
108                    true
109                }
110            },
111        )
112    }
113    fn poll_request_inner(
114        id: Id,
115        scheme: &mut impl SchemeSync,
116        states: &mut HashMap<Id, (CallerCtx, Op)>,
117    ) -> Result<ControlFlow<Response, (CallerCtx, Op)>> {
118        let (caller, mut op) = states.remove(&id).ok_or(Error::new(EIO))?;
119        let resp = match op.handle_sync_dont_consume(&caller, scheme) {
120            SchemeResponse::Opened(Err(Error {
121                errno: errno::EWOULDBLOCK | errno::EAGAIN,
122            }))
123            | SchemeResponse::Regular(Err(Error {
124                errno: errno::EWOULDBLOCK | errno::EAGAIN,
125            })) if !op.is_explicitly_nonblock() => {
126                return Ok(ControlFlow::Continue((caller, op)));
127            }
128            SchemeResponse::Regular(r) => Response::new(r, op),
129            SchemeResponse::Opened(o) => Response::open_dup_like(o, op),
130        };
131        Ok(ControlFlow::Break(resp))
132    }
133    pub fn poll_ready_requests<S, G>(&mut self, mut acquire_scheme: impl FnMut() -> G) -> Result<()>
134    where
135        S: SchemeSync,
136        G: Deref<Target = S> + DerefMut,
137    {
138        for id in self.ready_queue.drain(..) {
139            match Self::poll_request_inner(id, &mut *acquire_scheme(), &mut self.states)? {
140                ControlFlow::Break(resp) => {
141                    self.responses_to_write.push_back(resp);
142                }
143                ControlFlow::Continue((caller, op)) => {
144                    self.states.insert(id, (caller, op));
145                }
146            }
147        }
148        Ok(())
149    }
150    pub fn poll_all_requests<S, G>(&mut self, acquire_scheme: impl FnMut() -> G) -> Result<()>
151    where
152        S: SchemeSync,
153        G: Deref<Target = S> + DerefMut,
154    {
155        // TODO: implement waker-like API
156        self.ready_queue.clear();
157        self.ready_queue.extend(self.states.keys().copied());
158        self.poll_ready_requests(acquire_scheme)
159    }
160    pub fn write_responses(&mut self) -> Result<bool> {
161        match self
162            .socket
163            .write_responses(&mut self.responses_to_write, SignalBehavior::Restart)
164        {
165            // TODO: write ECANCELED or similar for everything in self.states?
166            //Ok(()) if !self.responses_to_write.is_empty() => Ok(false), // EOF, FIXME
167            Ok(())
168            | Err(Error {
169                errno: errno::EINTR | errno::EWOULDBLOCK | errno::EAGAIN,
170            }) => Ok(true),
171            Err(err) => return Err(LError::from(err).into()),
172        }
173    }
174}