Expand description
This crate defines a channel for requesting and receiving data. Each channel has only one requesting end, but it can have multiple responding ends. It is useful for implementing work sharing.
The two ends of the channel are asynchronous with respect to each other, so it is kinda nonblocking. However, if multiple responding ends try to respond to the same request, only one will succeed; the rest will return errors.
§Design
§Overview
reqchan
is built around the two halves of the channel: Requester
and Responder
. Both implement methods, Requester::try_request()
and
Responder::try_respond()
, that, when succesful, lock their corresponding
side of the channel and return contracts. RequestContract
requires the
user to either successfully receive a datum or cancel the request.
ResponseContract
requires the user to send a datum. These requirements
prevent the system from losing data sent through the channel.
§Locking
Responder::try_response()
locks the responding side to prevent other
potential responders from responding to the same request. However,
Requester::try_request()
locks the requesting side of the channel
to prevent the user from trying to issue multiple outstanding requests.
Both locks are dropped when their corresponding contract object is dropped.
§Contracts
Requester::try_request()
has to issue a RequestContract
so the
thread of execution does not block waiting for a response. However,
that reason does not apply to Responder::try_response()
. I originally
made Responder::try_response()
send the datum. However, that required
the user to have the datum available to send even if it could not be sent,
and it required the user to handle the returned datum if it could not be
sent. If the datum was, say, half the contents of a Vec
, this might entail
lots of expensive memory allocation. Therefore, I made Responder::try_response()
return a ResponseContract
indicating that the responder could and would
respond to the request. This way the user only has to perform the necessary
steps to send the datum if the datum must be sent.
§Examples
§Simple Example
This simple, single-threaded example demonstrates most of the API.
The only thing left out is RequestContract::try_cancel()
.
extern crate reqchan;
// Create channel.
let (requester, responder) = reqchan::channel::<u32>();
// Issue request.
let mut request_contract = requester.try_request().unwrap();
// Respond with number.
responder.try_respond().unwrap().send(5);
// Receive and print number.
println!("Number is {}", request_contract.try_receive().unwrap());
§More Complex Example
This more complex example demonstrates more “real-world” usage. One thread requests a ‘task’ (i.e. a closure to run), and the other two threads fight over who gets to respond with their own personal task. Meanwhile, the requesting thread is polling for a task, and if it gets one in time, it runs it. Regardless of whether or not the receiver got a task or timed out, the receiver notifies other threads to stop running, and stops itself.
extern crate reqchan as chan;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::{Duration, Instant};
// Stuff to make it easier to pass around closures.
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Task = Box<FnBox + Send + 'static>;
// Variable used to test calling a `Task` sent between threads.
let test_var = Arc::new(AtomicUsize::new(0));
let test_var2 = test_var.clone();
let test_var3 = test_var.clone();
// Variable needed to stop `responder` thread if `requester` times out
let should_exit = Arc::new(AtomicBool::new(false));
let should_exit_copy_1 = should_exit.clone();
let should_exit_copy_2 = should_exit.clone();
let (requester, responder) = chan::channel::<Task>();
let responder2 = responder.clone();
// requesting thread
let requester_handle = thread::spawn(move || {
let start_time = Instant::now();
let timeout = Duration::new(0, 1000000);
let mut contract = requester.try_request().unwrap();
loop {
// Try to cancel request and stop threads if runtime
// has exceeded `timeout`.
if start_time.elapsed() >= timeout {
// Try to cancel request.
// This should only fail if `responder` has started responding.
if contract.try_cancel() {
// Notify other threads to stop.
should_exit.store(true, Ordering::SeqCst);
break;
}
}
// Try getting `task` from `responder`.
match contract.try_receive() {
// `contract` received `task`.
Ok(task) => {
task.call_box();
// Notify other threads to stop.
should_exit.store(true, Ordering::SeqCst);
break;
},
// Continue looping if `responder` has not yet sent `task`.
Err(chan::TryReceiveError::Empty) => {},
// The only other error is `chan::TryReceiveError::Done`.
// This only happens if we call `contract.try_receive()`
// after either receiving data or cancelling the request.
_ => unreachable!(),
}
}
});
// responding thread 1
let responder_1_handle = thread::spawn(move || {
let mut tasks = vec![Box::new(move || {
test_var2.fetch_add(1, Ordering::SeqCst);
}) as Task];
loop {
// Exit loop if `receiver` has timed out.
if should_exit_copy_1.load(Ordering::SeqCst) {
break;
}
// Send `task` to `receiver` if it has issued a request.
match responder2.try_respond() {
// `responder2` can respond to request.
Ok(contract) => {
contract.send(tasks.pop().unwrap());
break;
},
// Either `requester` has not yet made a request,
// or `responder2` already handled the request.
Err(chan::TryRespondError::NoRequest) => {},
// `responder2` is processing request..
Err(chan::TryRespondError::Locked) => { break; },
}
}
});
// responding thread 2
let responder_2_handle = thread::spawn(move || {
let mut tasks = vec![Box::new(move || {
test_var3.fetch_add(2, Ordering::SeqCst);
}) as Task];
loop {
// Exit loop if `receiver` has timed out.
if should_exit_copy_2.load(Ordering::SeqCst) {
break;
}
// Send `task` to `receiver` if it has issued a request.
match responder.try_respond() {
// `responder2` can respond to request.
Ok(contract) => {
contract.send(tasks.pop().unwrap());
break;
},
// Either `requester` has not yet made a request,
// or `responder` already handled the request.
Err(chan::TryRespondError::NoRequest) => {},
// `responder` is processing request.
Err(chan::TryRespondError::Locked) => { break; },
}
}
});
requester_handle.join().unwrap();
responder_1_handle.join().unwrap();
responder_2_handle.join().unwrap();
// `num` can be 0, 1 or 2.
let num = test_var.load(Ordering::SeqCst);
println!("Number is {}", num);
Structs§
- Request
Contract - This is the contract returned by a successful
Requester::try_request()
. It represents the caller’s exclusive access to the requesting side of the channel. The user can either try to get a datum from the responding side or attempt to cancel the request. To prevent data loss,RequestContract
will panic if the user has not received a datum or cancelled the request. - Requester
- This end of the channel requests and receives data from its
Responder
(s). - Responder
- This end of the channel sends data in response to requests from
its
Requester
. - Response
Contract - This is the contract returned by a successful
Responder::try_response()
. It represents the caller’s exclusive access to the responding side of the channel. It ensures the user sends a datum by panicking if they have not.
Enums§
Functions§
- channel
- This function creates a
reqchan
and returns a tuple containing the two ends of this bidirectional request->response channel.