1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
//! This crate provides a wrapper type for making long-running service loops cancellable.
//!
//! Let's dive right in with an example. For further details see
//! [`Cancellable`](https://docs.rs/minion/*/minion/trait.Cancellable.html).
//!
//! ```
//! # use minion::*;
//! # use std::{time, thread};
//! # struct Service;
//! # impl Cancellable for Service {
//! # type Error = ();
//! # fn for_each(&mut self) -> Result<LoopState, ()> { Ok(LoopState::Break) }
//! # }
//! # impl Service { fn new() -> Self { Service } }
//! // impl Cancellable for Service { .. }
//! let s = Service::new();
//!
//! // start the service loop on a new thread
//! let h = s.spawn();
//!
//! // get a handle that allows cancelling the service loop
//! let exit = h.canceller();
//!
//! // spin up a new thread that will handle exit signals
//! thread::spawn(move || {
//! // this might catch Ctrl-C from the user, wait for a particular packet,
//! // or for any other condition that signals that the service should exit
//! // cleanly. in this case, we just terminate after a fixed amount of time.
//! thread::sleep(time::Duration::from_secs(1));
//!
//! // tell the service loop to exit at the first opportunity
//! exit.cancel();
//! });
//!
//! // block until the service loop exits or errors.
//! h.wait().unwrap();
//! ```
#![deny(missing_docs)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
/// Indicate whether main service loop should continue accepting new work.
pub enum LoopState {
/// Accept more work.
Continue,
/// Stop accepting work and return.
Break,
}
/// A service that implements `Cancellable` can be told to stop accepting new work at any time, and
/// will return at the first following opportunity.
///
/// More concretely, it emulates a loop like the following:
///
/// ```rust,ignore
/// loop {
/// // fetch some work
/// // do some work that might error
/// }
/// ```
///
/// But where the `loop` can be "cancelled". That is, after the next piece of work is processed, no
/// more work is handled, and the loop breaks.
///
/// This trait provides two main methods, [`Cancellable::run`] and [`Cancellable::spawn`]. The
/// former runs the loop on the current thread (and thus blocks it). The latter spawns a new
/// thread, and executes the loop on that thread. Only loops started using `spawn` can be
/// cancelled.
///
/// For example, the implementation below shows how a classic server accept loop could be turned
/// into a cancellable accept loop. If [`Handle::cancel`] is called, then at most one more
/// connection will be accepted before the loop returns and [`Handle::wait`] would too.
///
/// ```no_run
/// # extern crate minion;
/// # use minion::*;
/// # use std::{
/// # io::{self, prelude::*}, net, thread, time,
/// # };
/// struct Service(net::TcpListener);
/// impl Cancellable for Service {
/// type Error = io::Error;
/// fn for_each(&mut self) -> Result<minion::LoopState, Self::Error> {
/// let mut stream = self.0.accept()?.0;
/// write!(stream, "hello!\n")?;
/// Ok(minion::LoopState::Continue)
/// }
/// }
///
/// impl Service {
/// fn new() -> io::Result<Self> {
/// Ok(Service(net::TcpListener::bind("127.0.0.1:6556")?))
/// }
/// }
///
/// fn main() {
/// # fn foo() -> io::Result<()> {
/// Service::new()?.run()?;
/// # Ok(())
/// # }
/// # foo().unwrap();
/// }
/// ```
pub trait Cancellable {
/// Error type for [`Cancellable::for_each`].
type Error;
/// This method is called once for every iteration of the loop.
///
/// If it errors, the outer service loop will also return with that same error.
/// This error can be accessed through `Handle::wait()`.
/// If it returns a `LoopState`, the service loop will continue or break accordingly.
/// If it panics, the panic will be propagated to the waiting thread.
fn for_each(&mut self) -> Result<LoopState, Self::Error>;
/// Continuously execute [`Cancellable::for_each`] until it returns an error or a
/// [`LoopState::Break`].
fn run(&mut self) -> Result<(), Self::Error> {
loop {
match self.for_each() {
Ok(LoopState::Continue) => {}
Ok(LoopState::Break) => break,
Err(e) => return Err(e),
}
}
Ok(())
}
/// Continuously execute [`Cancellable::for_each`] in a new thread, and return a [`Handle`] to
/// that loop so that it can be cancelled or waited for.
fn spawn(mut self) -> Handle<Self::Error>
where
Self: Sized + Send + 'static,
Self::Error: Send + 'static,
{
let keep_running = Arc::new(AtomicBool::new(true));
let jh = {
let keep_running = keep_running.clone();
thread::spawn(move || {
while keep_running.load(Ordering::Relaxed) {
match self.for_each() {
Ok(LoopState::Continue) => {}
Ok(LoopState::Break) => break,
Err(e) => return Err(e),
}
}
Ok(())
})
};
Handle {
canceller: Canceller { keep_running },
executor: jh,
}
}
}
/// A handle to a running service loop.
///
/// You can use it to cancel the running loop at the next opportunity (through [`Handle::cancel`]),
/// or to wait for the loop to terminate (through [`Handle::wait`]). You can also use
/// [`Handle::canceller`] to get a [`Canceller`] handle, which lets you terminate the service loop
/// elsewhere (e.g., while waiting).
pub struct Handle<E> {
canceller: Canceller,
executor: thread::JoinHandle<Result<(), E>>,
}
/// A handle that allows the cancellation of a running service loop.
#[derive(Clone)]
pub struct Canceller {
keep_running: Arc<AtomicBool>,
}
impl<E> Handle<E> {
/// Get another handle for cancelling the service loop.
///
/// This can be handy if you want one thread to wait for the service loop to exit, while
/// another watches for exit signals.
pub fn canceller(&self) -> Canceller {
Canceller {
keep_running: self.keep_running.clone(),
}
}
/// Block the current thread waiting for the service loop to exit, and return its result.
///
/// If the service loop returns an error, this method will return it in the `Err` value.
/// If the service loop panics, this method will also panic with the same error.
pub fn wait(self) -> Result<(), E> {
match self.executor.join() {
Ok(r) => r,
Err(e) => {
// propagate the panic
panic!(e)
}
}
}
}
use std::ops::Deref;
impl<E> Deref for Handle<E> {
type Target = Canceller;
fn deref(&self) -> &Self::Target {
&self.canceller
}
}
impl Canceller {
/// Cancel the currently running service loop. This method does not block; it sends a signal
/// that the service loop should cease execution and returns immediately.
///
/// Note that this will *not* interrupt a currently executing [`Cancellable::for_each`].
/// Instead, the next time [`Cancellable::for_each`] *would* be called, the service loop will
/// return.
pub fn cancel(&self) {
self.keep_running.store(false, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
io::{self, prelude::*}, net, thread,
};
struct Service(net::TcpListener);
impl Cancellable for Service {
type Error = io::Error;
fn for_each(&mut self) -> Result<LoopState, Self::Error> {
let mut stream = match self.0.accept() {
Ok((stream, _)) => stream,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
return Ok(LoopState::Continue)
}
Err(e) => return Err(e),
};
write!(stream, "hello!")?;
Ok(LoopState::Continue)
}
}
impl Service {
fn new() -> Self {
Service(net::TcpListener::bind("127.0.0.1:0").unwrap())
}
fn port(&self) -> u16 {
self.0.local_addr().unwrap().port()
}
}
fn connect_assert(port: u16) -> Option<io::Error> {
match net::TcpStream::connect(("127.0.0.1", port)) {
Ok(mut c) => {
let mut r = String::new();
if let Err(e) = c.read_to_string(&mut r) {
return Some(e);
}
assert_eq!(r, "hello!");
None
},
Err(e) => {
Some(e)
}
}
}
#[test]
fn it_runs() {
let mut s = Service::new();
let port = s.port();
thread::spawn(move || {
s.run().unwrap();
});
assert!(connect_assert(port).is_none());
assert!(connect_assert(port).is_none());
}
#[test]
fn it_cancels() {
let s = Service::new();
let port = s.port();
let h = s.spawn();
assert!(connect_assert(port).is_none());
assert!(connect_assert(port).is_none());
h.cancel();
let mut succeeded = 0;
// cancel will ensure that for_each is not call *again*
// it will *not* terminate the currently running for_each
// note that it *may* terminate early if accept() gets interrupted
while connect_assert(port).is_none() {
succeeded += 1;
assert!(succeeded <= 1);
}
// instead of calling for_each again, the loop should now have exited
h.wait().unwrap();
}
}