use std::{
fmt::{self, Debug},
io,
os::unix::io::RawFd,
path::{Path, PathBuf},
time::Duration,
};
pub type Result<T, V> = std::result::Result<T, GlommioError<V>>;
#[derive(Debug)]
pub enum ResourceType<T> {
Semaphore {
requested: u64,
available: u64,
},
RwLock,
Channel(T),
File(String),
Gate,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum QueueErrorKind {
StillActive,
NotFound,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReactorErrorKind {
IncorrectSourceType(String),
MemLockLimit(u64, u64),
}
impl fmt::Display for ReactorErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ReactorErrorKind::IncorrectSourceType(x) => {
write!(f, "Incorrect source type: {:?}!", x)
}
ReactorErrorKind::MemLockLimit(max, min) => write!(
f,
"The memlock resource limit is too low: {} (recommended {})",
max, min
),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutorErrorKind {
QueueError {
index: usize,
kind: QueueErrorKind,
},
InvalidId(usize),
}
impl fmt::Display for ExecutorErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExecutorErrorKind::QueueError { index, kind } => {
write!(f, "Queue #{} is {}", index, kind)
}
ExecutorErrorKind::InvalidId(x) => {
write!(f, "indexing executor with id {}, which is invalid", x)
}
}
}
}
#[derive(Debug)]
pub enum BuilderErrorKind {
NonExistentCpus {
cpu: usize,
},
InsufficientCpus {
required: usize,
available: usize,
},
NrShards {
minimum: usize,
shards: usize,
},
ThreadPanic(Box<dyn std::any::Any + Send>),
}
impl fmt::Display for BuilderErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NonExistentCpus { cpu } => {
write!(f, "CPU {} doesn't exist on the host", cpu)
}
Self::InsufficientCpus {
available,
required,
} => write!(f, "found {} of {} required CPUs", available, required),
Self::NrShards { minimum, shards } => write!(
f,
"requested {} shards but a minimum of {} is required",
minimum, shards
),
Self::ThreadPanic(_) => write!(f, "thread panicked"),
}
}
}
pub enum GlommioError<T> {
IoError(io::Error),
EnhancedIoError {
source: io::Error,
op: &'static str,
path: Option<PathBuf>,
fd: Option<RawFd>,
},
ExecutorError(ExecutorErrorKind),
BuilderError(BuilderErrorKind),
Closed(ResourceType<T>),
CanNotBeClosed(ResourceType<T>, &'static str),
WouldBlock(ResourceType<T>),
ReactorError(ReactorErrorKind),
TimedOut(Duration),
}
impl<T> From<io::Error> for GlommioError<T> {
fn from(err: io::Error) -> Self {
GlommioError::IoError(err)
}
}
impl<T> fmt::Display for GlommioError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
GlommioError::IoError(err) => write!(f, "IO error occurred: {}", err),
GlommioError::EnhancedIoError {
source,
op,
path,
fd,
} => write!(
f,
"{}, op: {} path: {:?} with fd: {:?}",
source, op, path, fd
),
GlommioError::ExecutorError(err) => write!(f, "Executor error: {}", err),
GlommioError::BuilderError(err) => write!(f, "Executor builder error: {}", err),
GlommioError::Closed(rt) => match rt {
ResourceType::Semaphore {
requested,
available,
} => write!(
f,
"Semaphore is closed (requested: {}, available: {})",
requested, available
),
ResourceType::RwLock => write!(f, "RwLock is closed"),
ResourceType::Channel(_) => write!(f, "Channel is closed"),
ResourceType::File(msg) => write!(f, "File is closed ({})", msg),
ResourceType::Gate => write!(f, "Gate is closed"),
},
GlommioError::CanNotBeClosed(_, s) => write!(
f,
"Can not be closed because certain conditions are not satisfied. {}",
*s
),
GlommioError::WouldBlock(rt) => match rt {
ResourceType::Semaphore {
requested,
available,
} => write!(
f,
"Semaphore operation would block (requested: {}, available: {})",
requested, available
),
ResourceType::RwLock => write!(f, "RwLock operation would block"),
ResourceType::Channel(_) => write!(f, "Channel operation would block"),
ResourceType::File(msg) => write!(f, "File operation would block ({})", msg),
ResourceType::Gate => write!(f, "Gate operation would block"),
},
GlommioError::ReactorError(err) => write!(f, "Reactor error: {}", err),
GlommioError::TimedOut(dur) => write!(f, "Operation timed out after {:#?}", dur),
}
}
}
impl<T> std::error::Error for GlommioError<T> {}
impl GlommioError<()> {
pub(crate) fn create_enhanced<P: AsRef<Path>>(
source: io::Error,
op: &'static str,
path: Option<P>,
fd: Option<RawFd>,
) -> GlommioError<()> {
GlommioError::EnhancedIoError {
source,
op,
path: path.map(|path| path.as_ref().to_path_buf()),
fd,
}
}
}
impl<T> From<(io::Error, ResourceType<T>)> for GlommioError<T> {
fn from(tuple: (io::Error, ResourceType<T>)) -> Self {
match tuple.0.kind() {
io::ErrorKind::BrokenPipe => GlommioError::Closed(tuple.1),
io::ErrorKind::WouldBlock => GlommioError::WouldBlock(tuple.1),
_ => tuple.0.into(),
}
}
}
impl<T> GlommioError<T> {
pub fn raw_os_error(&self) -> Option<i32> {
match self {
GlommioError::IoError(x) => x.raw_os_error(),
GlommioError::EnhancedIoError {
source,
op: _,
path: _,
fd: _,
} => source.raw_os_error(),
_ => None,
}
}
pub(crate) fn queue_still_active(index: usize) -> GlommioError<T> {
GlommioError::ExecutorError(ExecutorErrorKind::QueueError {
index,
kind: QueueErrorKind::StillActive,
})
}
pub(crate) fn queue_not_found(index: usize) -> GlommioError<T> {
GlommioError::ExecutorError(ExecutorErrorKind::QueueError {
index,
kind: QueueErrorKind::NotFound,
})
}
}
impl fmt::Display for QueueErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
QueueErrorKind::StillActive => f.write_str("still active"),
QueueErrorKind::NotFound => f.write_str("not found"),
}
}
}
impl<T> fmt::Display for ResourceType<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let fmt_str = match self {
ResourceType::Semaphore { .. } => "Semaphore".to_string(),
ResourceType::RwLock => "RwLock".to_string(),
ResourceType::Channel(_) => "Channel".to_string(),
ResourceType::File(_) => "File".to_string(),
ResourceType::Gate => "Gate".to_string(),
};
write!(f, "{}", &fmt_str)
}
}
#[doc(hidden)]
impl<T> Debug for GlommioError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
GlommioError::IoError(err) => write!(f, "{:?}", err),
GlommioError::Closed(resource) => match resource {
ResourceType::Semaphore {
requested,
available,
} => write!(
f,
"Semaphore is closed {{ requested {}, available: {} }}",
requested, available
),
ResourceType::RwLock => write!(f, "RwLock is closed {{ .. }}"),
ResourceType::Channel(_) => write!(f, "Channel is closed {{ .. }}"),
ResourceType::File(msg) => write!(f, "File is closed (\"{}\")", msg),
ResourceType::Gate => write!(f, "Gate is closed"),
},
GlommioError::CanNotBeClosed(resource, str) => match resource {
ResourceType::RwLock => write!(f, "RwLock can not be closed (\"{}\")", *str),
ResourceType::Channel(_) => write!(f, "Channel can not be closed (\"{}\")", *str),
ResourceType::File(msg) => {
write!(f, "File can not be closed : (\"{}\"). (\"{}\")", *str, msg)
}
ResourceType::Gate => write!(f, "Gate can not be closed: {}", *str),
ResourceType::Semaphore {
requested,
available,
} => write!(
f,
"Semaphore can not be closed: {}. {{ requested {}, available: {} }}",
*str, requested, available
),
},
GlommioError::WouldBlock(resource) => match resource {
ResourceType::Semaphore {
requested,
available,
} => write!(
f,
"Semaphore operation would block {{ requested {}, available: {} }}",
requested, available
),
ResourceType::RwLock => write!(f, "RwLock operation would block {{ .. }}"),
ResourceType::Channel(_) => write!(f, "Channel operation would block {{ .. }}"),
ResourceType::File(msg) => write!(f, "File operation would block (\"{}\")", msg),
ResourceType::Gate => write!(f, "Gate operation would block {{ .. }}"),
},
GlommioError::ExecutorError(kind) => match kind {
ExecutorErrorKind::QueueError { index, kind } => f.write_fmt(format_args!(
"QueueError {{ index: {}, kind: {:?} }}",
index, kind
)),
ExecutorErrorKind::InvalidId(x) => {
f.write_fmt(format_args!("Invalid Executor ID {{ id: {} }}", x))
}
},
GlommioError::BuilderError(kind) => match kind {
BuilderErrorKind::NonExistentCpus { cpu } => {
f.write_fmt(format_args!("NonExistentCpus {{ cpu: {} }}", cpu))
}
BuilderErrorKind::InsufficientCpus {
required,
available,
} => f.write_fmt(format_args!(
"InsufficientCpus {{ required: {}, available: {} }}",
required, available
)),
BuilderErrorKind::NrShards { minimum, shards } => f.write_fmt(format_args!(
"NrShards {{ minimum: {}, shards: {} }}",
minimum, shards
)),
BuilderErrorKind::ThreadPanic(_) => write!(f, "Thread panicked {{ .. }}"),
},
GlommioError::EnhancedIoError {
source,
op,
path,
fd,
} => write!(
f,
"EnhancedIoError {{ source: {:?}, op: {:?}, path: {:?}, fd: {:?} }}",
source, op, path, fd
),
GlommioError::ReactorError(kind) => match kind {
ReactorErrorKind::IncorrectSourceType(x) => {
write!(f, "ReactorError {{ kind: IncorrectSourceType {:?} }}", x)
}
ReactorErrorKind::MemLockLimit(a, b) => {
write!(f, "ReactorError {{ kind: MemLockLimit({:?}/{:?}) }}", a, b)
}
},
GlommioError::TimedOut(dur) => write!(f, "TimedOut {{ dur {:?} }}", dur),
}
}
}
impl<T> From<GlommioError<T>> for io::Error {
fn from(err: GlommioError<T>) -> Self {
let display_err = err.to_string();
match err {
GlommioError::IoError(io_err) => io_err,
GlommioError::WouldBlock(_) => io::Error::new(io::ErrorKind::WouldBlock, display_err),
GlommioError::Closed(_) => io::Error::new(io::ErrorKind::BrokenPipe, display_err),
GlommioError::CanNotBeClosed(_, _) => {
io::Error::new(io::ErrorKind::BrokenPipe, display_err)
}
GlommioError::ExecutorError(ExecutorErrorKind::QueueError { index, kind }) => {
match kind {
QueueErrorKind::StillActive => io::Error::new(
io::ErrorKind::Other,
format!("Queue #{} still active", index),
),
QueueErrorKind::NotFound => io::Error::new(
io::ErrorKind::NotFound,
format!("Queue #{} not found", index),
),
}
}
GlommioError::ExecutorError(ExecutorErrorKind::InvalidId(id)) => io::Error::new(
io::ErrorKind::InvalidInput,
format!("invalid executor id {}", id),
),
GlommioError::BuilderError(BuilderErrorKind::NonExistentCpus { .. })
| GlommioError::BuilderError(BuilderErrorKind::InsufficientCpus { .. })
| GlommioError::BuilderError(BuilderErrorKind::NrShards { .. })
| GlommioError::BuilderError(BuilderErrorKind::ThreadPanic(_)) => io::Error::new(
io::ErrorKind::Other,
format!("Executor builder error: {}", display_err),
),
GlommioError::EnhancedIoError { source, .. } => {
io::Error::new(source.kind(), display_err)
}
GlommioError::ReactorError(e) => match e {
ReactorErrorKind::IncorrectSourceType(x) => io::Error::new(
io::ErrorKind::InvalidData,
format!("IncorrectSourceType {:?}", x),
),
ReactorErrorKind::MemLockLimit(a, b) => io::Error::new(
io::ErrorKind::Other,
format!("MemLockLimit({:?}/{:?})", a, b),
),
},
GlommioError::TimedOut(dur) => io::Error::new(
io::ErrorKind::TimedOut,
format!("timed out after {:#?}", dur),
),
}
}
}
#[cfg(test)]
mod test {
use std::{io, panic::panic_any};
use super::*;
#[test]
#[should_panic(
expected = "File operation would block (\"Reading 100 bytes from position 90 would cross \
a buffer boundary (Buffer size 120)\")"
)]
fn extended_file_err_msg_unwrap() {
let err: Result<(), ()> = Err(GlommioError::<()>::WouldBlock(ResourceType::File(format!(
"Reading {} bytes from position {} would cross a buffer boundary (Buffer size {})",
100, 90, 120
))));
err.unwrap();
}
#[test]
#[should_panic(
expected = "File operation would block (Reading 100 bytes from position 90 would cross a \
buffer boundary (Buffer size 120))"
)]
fn extended_file_err_msg() {
let err: Result<(), ()> = Err(GlommioError::<()>::WouldBlock(ResourceType::File(format!(
"Reading {} bytes from position {} would cross a buffer boundary (Buffer size {})",
100, 90, 120
))));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "File is closed (Some specific message here with value: 1001)")]
fn extended_closed_file_err_msg() {
let err: Result<(), ()> = Err(GlommioError::<()>::Closed(ResourceType::File(format!(
"Some specific message here with value: {}",
1001
))));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "Queue #0 is still active")]
fn queue_still_active_err_msg() {
let err: Result<(), ()> = Err(GlommioError::queue_still_active(0));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "Queue #0 is not found")]
fn queue_not_found_err_msg() {
let err: Result<(), ()> = Err(GlommioError::queue_not_found(0));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "RwLock is closed")]
fn rwlock_closed_err_msg() {
let err: Result<(), ()> = Err(GlommioError::Closed(ResourceType::RwLock));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "Semaphore is closed")]
fn semaphore_closed_err_msg() {
let err: Result<(), ()> = Err(GlommioError::Closed(ResourceType::Semaphore {
requested: 0,
available: 0,
}));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "Channel is closed")]
fn channel_closed_err_msg() {
let err: Result<(), ()> = Err(GlommioError::Closed(ResourceType::Channel(())));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "RwLock operation would block")]
fn rwlock_wouldblock_err_msg() {
let err: Result<(), ()> = Err(GlommioError::WouldBlock(ResourceType::RwLock));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "Semaphore operation would block")]
fn semaphore_wouldblock_err_msg() {
let err: Result<(), ()> = Err(GlommioError::WouldBlock(ResourceType::Semaphore {
requested: 0,
available: 0,
}));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "Channel operation would block")]
fn channel_wouldblock_err_msg() {
let err: Result<(), ()> = Err(GlommioError::WouldBlock(ResourceType::Channel(())));
panic_any(err.unwrap_err().to_string());
}
#[test]
#[should_panic(expected = "File operation would block (specific error message here)")]
fn file_wouldblock_err_msg() {
let err: Result<(), ()> = Err(GlommioError::WouldBlock(ResourceType::File(
"specific error message here".to_string(),
)));
panic_any(err.unwrap_err().to_string());
}
#[test]
fn composite_error_from_into() {
let err: GlommioError<()> =
io::Error::new(io::ErrorKind::Other, "test other io-error").into();
let _: io::Error = err.into();
let err: GlommioError<()> =
io::Error::new(io::ErrorKind::BrokenPipe, "some error msg").into();
let _: io::Error = err.into();
let channel_closed = GlommioError::Closed(ResourceType::Channel(()));
let _: io::Error = channel_closed.into();
let lock_closed = GlommioError::Closed(ResourceType::RwLock::<()>);
let _: io::Error = lock_closed.into();
let semaphore_closed = GlommioError::Closed(ResourceType::Semaphore::<()> {
requested: 10,
available: 0,
});
let _: io::Error = semaphore_closed.into();
let channel_block = GlommioError::WouldBlock(ResourceType::Channel(()));
let _: io::Error = channel_block.into();
let lock_block = GlommioError::WouldBlock(ResourceType::RwLock::<()>);
let _: io::Error = lock_block.into();
let semaphore_block = GlommioError::WouldBlock(ResourceType::Semaphore::<()> {
requested: 0,
available: 0,
});
let _: io::Error = semaphore_block.into();
}
#[test]
fn enhance_error() {
let inner = io::Error::from_raw_os_error(9);
let enhanced = GlommioError::EnhancedIoError::<()> {
source: inner,
op: "testing enhancer",
path: None,
fd: Some(32),
};
let s = format!("{}", enhanced);
assert_eq!(
s,
"Bad file descriptor (os error 9), op: testing enhancer path: None with fd: Some(32)"
);
}
fn convert_error() -> io::Result<()> {
let inner = io::Error::from_raw_os_error(9);
let enhanced = GlommioError::<()>::create_enhanced::<PathBuf>(
inner,
"testing enhancer",
None,
Some(32),
);
Err(enhanced.into())
}
#[test]
fn enhance_error_converted() {
let io_error = convert_error().unwrap_err();
let s = format!("{}", io_error.into_inner().unwrap());
assert_eq!(
s,
"Bad file descriptor (os error 9), op: testing enhancer path: None with fd: Some(32)"
);
}
}