reactor/
resource.rs

1// Library for concurrent I/O resource management using reactor pattern.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Written in 2021-2023 by
6//     Dr. Maxim Orlovsky <orlovsky@ubideco.org>
7//     Alexis Sellier <alexis@cloudhead.io>
8//
9// Copyright 2022-2023 UBIDECO Institute, Switzerland
10// Copyright 2021 Alexis Sellier <alexis@cloudhead.io>
11//
12// Licensed under the Apache License, Version 2.0 (the "License");
13// you may not use this file except in compliance with the License.
14// You may obtain a copy of the License at
15//
16//     http://www.apache.org/licenses/LICENSE-2.0
17//
18// Unless required by applicable law or agreed to in writing, software
19// distributed under the License is distributed on an "AS IS" BASIS,
20// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21// See the License for the specific language governing permissions and
22// limitations under the License.
23
24use std::fmt::Debug;
25use std::hash::Hash;
26use std::io::{self, ErrorKind};
27use std::os::unix::io::AsRawFd;
28
29use crate::poller::IoType;
30
31/// I/O events which can be subscribed for - or notified about by the [`crate::Reactor`] on a
32/// specific [`Resource`].
33#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
34pub enum Io {
35    /// Input event
36    Read,
37    /// Output event
38    Write,
39}
40
41/// Type of the resource.
42#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
43pub enum ResourceType {
44    /// Listener resource.
45    Listener,
46    /// Transport resource.
47    Transport,
48}
49
50/// Generator for the new [`ResourceId`]s which should be used by pollers implementing [`Poll`]
51/// trait.
52#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)]
53#[display(inner)]
54pub struct ResourceIdGenerator(u64);
55
56impl Default for ResourceIdGenerator {
57    fn default() -> Self { ResourceIdGenerator(1) }
58}
59
60#[allow(dead_code)] // We need this before we've got non-popol implementations
61impl ResourceIdGenerator {
62    /// Returns the next id for the resource.
63    pub fn next(&mut self) -> ResourceId {
64        let id = self.0;
65        self.0 += 1;
66        ResourceId(id)
67    }
68}
69
70/// The resource identifier must be globally unique and non-reusable object. Because of this,
71/// things like [`RawFd`] and socket addresses can't operate like resource identifiers.
72#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)]
73#[display(inner)]
74pub struct ResourceId(u64);
75
76impl ResourceId {
77    /// Resource id for the waker (always zero).
78    pub const WAKER: ResourceId = ResourceId(0);
79}
80
81/// A resource which can be managed by the reactor.
82pub trait Resource: AsRawFd + WriteAtomic + Send {
83    /// Events which resource may generate upon receiving I/O from the reactor via
84    /// [`Self::handle_io`]. These events are passed to the reactor [`crate::Handler`].
85    type Event;
86
87    /// Method informing the reactor which types of events this resource is subscribed for.
88    fn interests(&self) -> IoType;
89
90    /// Method called by the reactor when an I/O event is received for this resource in a result of
91    /// poll syscall.
92    fn handle_io(&mut self, io: Io) -> Option<Self::Event>;
93}
94
95/// Error during write operation for a reactor-managed [`Resource`].
96#[derive(Debug, Display, Error, From)]
97pub enum WriteError {
98    /// Underlying resource is not ready to accept the data: for instance,
99    /// a connection has not yet established in full or handshake is not
100    /// complete. A specific case in which this error is returned is defined
101    /// by an underlying resource type; however, this error happens only
102    /// due to a business logic bugs in a [`crate::reactor::Handler`]
103    /// implementation.
104    #[display("resource not ready to accept the data")]
105    NotReady,
106
107    /// Error returned by the operating system and not by the resource itself.
108    #[display(inner)]
109    #[from]
110    Io(io::Error),
111}
112
113/// The trait guarantees that the data are either written in full - or, in case
114/// of an error, none of the data is written. Types implementing the trait must
115/// also guarantee that multiple attempts to do the write would not result in
116/// data to be written out of the initial ordering.
117pub trait WriteAtomic: io::Write {
118    /// Atomic non-blocking I/O write operation, which must either write the whole buffer to a
119    /// resource without blocking - or fail with [`WriteError::NotReady`] error.
120    ///
121    /// # Safety
122    ///
123    /// Panics on invalid [`WriteAtomic::write_or_buf`] implementation, i.e. if it doesn't handle
124    /// EGAGAIN, EINTER, EWOULDBLOCK I/O errors by buffering the data and returns them instead.
125    fn write_atomic(&mut self, buf: &[u8]) -> Result<(), WriteError> {
126        if !self.is_ready_to_write() {
127            Err(WriteError::NotReady)
128        } else {
129            self.write_or_buf(buf).map_err(|err| {
130                debug_assert!(
131                    !matches!(
132                        err.kind(),
133                        ErrorKind::WouldBlock | ErrorKind::Interrupted | ErrorKind::WriteZero
134                    ),
135                    "WriteAtomic::write_or_buf must handle EGAGAIN, EINTR, EWOULDBLOCK errors by \
136                     buffering the data"
137                );
138                WriteError::from(err)
139            })
140        }
141    }
142
143    /// Checks whether resource can be written to without blocking.
144    fn is_ready_to_write(&self) -> bool;
145
146    /// Empties any write buffers in a non-blocking way. If a non-blocking
147    /// operation is not possible, errors with [`io::ErrorKind::WouldBlock`]
148    /// kind of [`io::Error`].
149    ///
150    /// # Returns
151    ///
152    /// If the buffer contained any data before this operation.
153    fn empty_write_buf(&mut self) -> io::Result<bool>;
154
155    #[doc(hidden)]
156    /// Writes to the resource in a non-blocking way, buffering the data if necessary - or failing
157    /// with a system-level error.
158    ///
159    /// This method shouldn't be called directly; [`Self::write_atomic`] must be used instead.
160    ///
161    /// # Safety
162    ///
163    /// The method must handle EGAGAIN, EINTER, EWOULDBLOCK I/O errors and buffer the data in such
164    /// cases. Ig these errors are returned from this methods [`WriteAtomic::write_atomic`] will
165    /// panic.
166    fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>;
167}