use std::fmt;
use std::future::Future;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct SendError<T>(pub T);
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "command channel closed")
}
}
impl<T: fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Empty => f.write_str("command channel is empty"),
Self::Disconnected => f.write_str("command channel disconnected"),
}
}
}
impl std::error::Error for TryRecvError {}
pub struct CommandSender<T>(tokio::sync::mpsc::Sender<T>);
impl<T> Clone for CommandSender<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> CommandSender<T> {
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
self.0.send(value).await.map_err(|e| SendError(e.0))
}
pub fn try_send(&self, value: T) -> Result<(), SendError<T>> {
self.0.try_send(value).map_err(|e| match e {
tokio::sync::mpsc::error::TrySendError::Full(v)
| tokio::sync::mpsc::error::TrySendError::Closed(v) => SendError(v),
})
}
}
pub struct CommandReceiver<T>(tokio::sync::mpsc::Receiver<T>);
impl<T> CommandReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
self.0.recv().await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.0.try_recv().map_err(|e| match e {
tokio::sync::mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
tokio::sync::mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
})
}
}
pub fn command_channel<T>(capacity: usize) -> (CommandSender<T>, CommandReceiver<T>) {
let (tx, rx) = tokio::sync::mpsc::channel(capacity.max(1));
(CommandSender(tx), CommandReceiver(rx))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Elapsed;
impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("deadline elapsed")
}
}
impl std::error::Error for Elapsed {}
pub async fn timeout<F: Future>(duration: Duration, fut: F) -> Result<F::Output, Elapsed> {
tokio::time::timeout(duration, fut)
.await
.map_err(|_| Elapsed)
}
pub async fn sleep(duration: Duration) {
tokio::time::sleep(duration).await;
}
pub fn run_thread<F, Fut>(make_fut: F) -> std::thread::JoinHandle<()>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = ()> + 'static,
{
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build driver runtime")
.block_on(make_fut());
})
}
pub fn run_thread_named<F, Fut>(name: &str, make_fut: F) -> std::thread::JoinHandle<()>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = ()> + 'static,
{
std::thread::Builder::new()
.name(name.to_string())
.spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build driver runtime")
.block_on(make_fut());
})
.expect("failed to spawn driver thread")
}