operation-queue 1.0.0

Helpers for synchronizing asynchronous protocol operations.
Documentation
# Operation queue

Rust helpers for synchronizing asynchronous protocol operations.

## Queueing operations

An operation is any data structure that implements the `QueuedOperation` trait.
`QueuedOperation` implementations can be pushed to the back of an
`OperationQueue`, and they will be performed once all previous operations have
been performed and a runner is available.

```rust
use operation_queue::{OperationQueue, QueuedOperation};

// The number of operations to push to the queue.
const OPERATION_COUNT: usize = 100;

// The number of parallel runners the queue will start. Each runner performs
// one operation at a time.
const RUNNER_COUNT: usize = 100;

struct Operation {
    // Fields...
}

impl QueuedOperation for Operation {
    async fn perform() {
        // Perform the operation...
    }
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let queue = OperationQueue::new(|runner_fut| {
        let _ = tokio::spawn(runner_fut);
    });

    queue
        .start(RUNNER_COUNT)
        .expect("failed to start the queue");

    for i in 0..OPERATION_COUNT {
        let op = Operation { /** ... */ };
        queue
            .enqueue(Box::new(op))
            .await
            .expect("failed to enqueue operation");
    }

    // Do things, or wait for some shutdown signal...

    queue.stop().await;
}
```

## Synchronizing futures

This crate also provides helpers for synchronizing futures across operations
that should only run once at a time. These are located in the `line_token`
module, which requires the `line_token` feature.

An example in which this module can help is handling authentication failures
when sending multiple parallel requests to the same service: one request
encountering this kind of failure usually means the other requests will
encounter it too, and having each request try to re-authenticate at the same
time is wasteful (in the best of cases).

```rust
use operation_queue::line_token::{AcquireOutcome, Line, Token};

struct Request {
    error_handling_line: Line,
    // Other fields...
}

impl Request {
    fn new() -> Request {
        Request {
            error_handling_line: Line::new(),
        }
    }

    async fn perform(&self) -> Result<(), Error> {
        // Keep track of the token; if we've acquired it and dropped it, the
        // line will be released. We want to make sure the error has been
        // properly handled before this happens, which might include retrying
        // the request (and running through multiple iterations of the loop).
        let mut token: Option<Token<'_>> = None;

        let response = loop {
            match self.make_and_send_request() {
                Ok(response) => break response,
                Err(err) => {
                    // Try to acquire the token for this line.
                    // `AcquireOutcome::or_token` allows us to easily
                    // substitute the token if we have alread acquired it in a
                    // previous attempt.
                    token = match self.error_handling_line.try_acquire_token().or_token(token) {
                        // We've successfully acquired the token, let's handle
                        // this error.
                        AcquireOutcome::Success(new_token) => {
                            self.handle_error(err).await?;
                            Some(new_token)
                        }

                        // The token has already been acquired by another
                        // request, let's wait for them to finish and retry.
                        AcquireOutcome::Failure(shared) => {
                            shared.await?;
                            None
                        }
                    };
                }
            }

            // We've encountered an error but it has been dealt with (either
            // by us or by another request). Let's try the request again.
            continue;
        };

        // Do something with the response...

        // We've finished sending the request and processing the response. If
        // we were holding a token, it will now go out of scope and release the
        // line.
        Ok(())
    }
}
```

# Running tests

Some tests in this crate rely on tokio's [unstable
API](https://docs.rs/tokio/latest/tokio/index.html#unstable-features).
Therefore, running them requires the `--cfg tokio_unstable` compiler flag:

```shell
$ RUSTFLAGS="--cfg tokio_unstable" cargo test --all-features
```