reactor/resource.rs
1// Library for concurrent I/O resource management using reactor pattern.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Written in 2021-2023 by
6// Dr. Maxim Orlovsky <orlovsky@ubideco.org>
7// Alexis Sellier <alexis@cloudhead.io>
8//
9// Copyright 2022-2023 UBIDECO Institute, Switzerland
10// Copyright 2021 Alexis Sellier <alexis@cloudhead.io>
11//
12// Licensed under the Apache License, Version 2.0 (the "License");
13// you may not use this file except in compliance with the License.
14// You may obtain a copy of the License at
15//
16// http://www.apache.org/licenses/LICENSE-2.0
17//
18// Unless required by applicable law or agreed to in writing, software
19// distributed under the License is distributed on an "AS IS" BASIS,
20// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21// See the License for the specific language governing permissions and
22// limitations under the License.
23
24use std::fmt::Debug;
25use std::hash::Hash;
26use std::io::{self, ErrorKind};
27use std::os::unix::io::AsRawFd;
28
29use crate::poller::IoType;
30
31/// I/O events which can be subscribed for - or notified about by the [`crate::Reactor`] on a
32/// specific [`Resource`].
33#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
34pub enum Io {
35 /// Input event
36 Read,
37 /// Output event
38 Write,
39}
40
41/// Type of the resource.
42#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
43pub enum ResourceType {
44 /// Listener resource.
45 Listener,
46 /// Transport resource.
47 Transport,
48}
49
50/// Generator for the new [`ResourceId`]s which should be used by pollers implementing [`Poll`]
51/// trait.
52#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)]
53#[display(inner)]
54pub struct ResourceIdGenerator(u64);
55
56impl Default for ResourceIdGenerator {
57 fn default() -> Self { ResourceIdGenerator(1) }
58}
59
60#[allow(dead_code)] // We need this before we've got non-popol implementations
61impl ResourceIdGenerator {
62 /// Returns the next id for the resource.
63 pub fn next(&mut self) -> ResourceId {
64 let id = self.0;
65 self.0 += 1;
66 ResourceId(id)
67 }
68}
69
70/// The resource identifier must be globally unique and non-reusable object. Because of this,
71/// things like [`RawFd`] and socket addresses can't operate like resource identifiers.
72#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)]
73#[display(inner)]
74pub struct ResourceId(u64);
75
76impl ResourceId {
77 /// Resource id for the waker (always zero).
78 pub const WAKER: ResourceId = ResourceId(0);
79}
80
81/// A resource which can be managed by the reactor.
82pub trait Resource: AsRawFd + WriteAtomic + Send {
83 /// Events which resource may generate upon receiving I/O from the reactor via
84 /// [`Self::handle_io`]. These events are passed to the reactor [`crate::Handler`].
85 type Event;
86
87 /// Method informing the reactor which types of events this resource is subscribed for.
88 fn interests(&self) -> IoType;
89
90 /// Method called by the reactor when an I/O event is received for this resource in a result of
91 /// poll syscall.
92 fn handle_io(&mut self, io: Io) -> Option<Self::Event>;
93}
94
95/// Error during write operation for a reactor-managed [`Resource`].
96#[derive(Debug, Display, Error, From)]
97pub enum WriteError {
98 /// Underlying resource is not ready to accept the data: for instance,
99 /// a connection has not yet established in full or handshake is not
100 /// complete. A specific case in which this error is returned is defined
101 /// by an underlying resource type; however, this error happens only
102 /// due to a business logic bugs in a [`crate::reactor::Handler`]
103 /// implementation.
104 #[display("resource not ready to accept the data")]
105 NotReady,
106
107 /// Error returned by the operating system and not by the resource itself.
108 #[display(inner)]
109 #[from]
110 Io(io::Error),
111}
112
113/// The trait guarantees that the data are either written in full - or, in case
114/// of an error, none of the data is written. Types implementing the trait must
115/// also guarantee that multiple attempts to do the write would not result in
116/// data to be written out of the initial ordering.
117pub trait WriteAtomic: io::Write {
118 /// Atomic non-blocking I/O write operation, which must either write the whole buffer to a
119 /// resource without blocking - or fail with [`WriteError::NotReady`] error.
120 ///
121 /// # Safety
122 ///
123 /// Panics on invalid [`WriteAtomic::write_or_buf`] implementation, i.e. if it doesn't handle
124 /// EGAGAIN, EINTER, EWOULDBLOCK I/O errors by buffering the data and returns them instead.
125 fn write_atomic(&mut self, buf: &[u8]) -> Result<(), WriteError> {
126 if !self.is_ready_to_write() {
127 Err(WriteError::NotReady)
128 } else {
129 self.write_or_buf(buf).map_err(|err| {
130 debug_assert!(
131 !matches!(
132 err.kind(),
133 ErrorKind::WouldBlock | ErrorKind::Interrupted | ErrorKind::WriteZero
134 ),
135 "WriteAtomic::write_or_buf must handle EGAGAIN, EINTR, EWOULDBLOCK errors by \
136 buffering the data"
137 );
138 WriteError::from(err)
139 })
140 }
141 }
142
143 /// Checks whether resource can be written to without blocking.
144 fn is_ready_to_write(&self) -> bool;
145
146 /// Empties any write buffers in a non-blocking way. If a non-blocking
147 /// operation is not possible, errors with [`io::ErrorKind::WouldBlock`]
148 /// kind of [`io::Error`].
149 ///
150 /// # Returns
151 ///
152 /// If the buffer contained any data before this operation.
153 fn empty_write_buf(&mut self) -> io::Result<bool>;
154
155 #[doc(hidden)]
156 /// Writes to the resource in a non-blocking way, buffering the data if necessary - or failing
157 /// with a system-level error.
158 ///
159 /// This method shouldn't be called directly; [`Self::write_atomic`] must be used instead.
160 ///
161 /// # Safety
162 ///
163 /// The method must handle EGAGAIN, EINTER, EWOULDBLOCK I/O errors and buffer the data in such
164 /// cases. Ig these errors are returned from this methods [`WriteAtomic::write_atomic`] will
165 /// panic.
166 fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>;
167}