reqchan/lib.rs
1//! This crate defines a channel for requesting and receiving data. Each
2//! channel has only one requesting end, but it can have multiple responding
3//! ends. It is useful for implementing work sharing.
4//!
5//! The two ends of the channel are asynchronous with respect to each other,
6//! so it is kinda nonblocking. However, if multiple responding ends try to
7//! respond to the same request, only one will succeed; the rest will
8//! return errors.
9//!
10//! # Design
11//!
12//! ## Overview
13//!
14//! `reqchan` is built around the two halves of the channel: `Requester`
15//! and `Responder`. Both implement methods, `Requester::try_request()` and
16//! `Responder::try_respond()`, that, when succesful, lock their corresponding
17//! side of the channel and return contracts. `RequestContract` **requires** the
18//! user to either successfully receive a datum or cancel the request.
19//! `ResponseContract` requires the user to send a datum. These requirements
20//! prevent the system from losing data sent through the channel.
21//!
22//! ## Locking
23//!
24//! `Responder::try_response()` locks the responding side to prevent other
25//! potential responders from responding to the same request. However,
26//! `Requester::try_request()` locks the requesting side of the channel
27//! to prevent the user from trying to issue multiple outstanding requests.
28//! Both locks are dropped when their corresponding contract object is dropped.
29//!
30//! ## Contracts
31//!
32//! `Requester::try_request()` has to issue a `RequestContract` so the
33//! thread of execution does not block waiting for a response. However,
34//! that reason does not apply to `Responder::try_response()`. I originally
35//! made `Responder::try_response()` send the datum. However, that required
36//! the user to have the datum available to send even if it could not be sent,
37//! and it required the user to handle the returned datum if it could not be
38//! sent. If the datum was, say, half the contents of a `Vec`, this might entail
39//! lots of expensive memory allocation. Therefore, I made `Responder::try_response()`
40//! return a `ResponseContract` indicating that the responder *could* and *would*
41//! respond to the request. This way the user only has to perform the necessary
42//! steps to send the datum if the datum must be sent.
43//!
44//! # Examples
45//!
46//! ## Simple Example
47//!
48//! This simple, single-threaded example demonstrates most of the API.
49//! The only thing left out is `RequestContract::try_cancel()`.
50//!
51//! ```
52//! extern crate reqchan;
53//!
54//! // Create channel.
55//! let (requester, responder) = reqchan::channel::<u32>();
56//!
57//! // Issue request.
58//! let mut request_contract = requester.try_request().unwrap();
59//!
60//! // Respond with number.
61//! responder.try_respond().unwrap().send(5);
62//!
63//! // Receive and print number.
64//! println!("Number is {}", request_contract.try_receive().unwrap());
65//! ```
66//!
67//! ## More Complex Example
68//!
69//! This more complex example demonstrates more "real-world" usage.
70//! One thread requests a 'task' (i.e. a closure to run), and the
71//! other two threads fight over who gets to respond with their
72//! own personal task. Meanwhile, the requesting thread is polling
73//! for a task, and if it gets one in time, it runs it. Regardless of
74//! whether or not the receiver got a task or timed out, the receiver
75//! notifies other threads to stop running, and stops itself.
76//!
77//! ```
78//! extern crate reqchan as chan;
79//!
80//! use std::sync::Arc;
81//! use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
82//! use std::thread;
83//! use std::time::{Duration, Instant};
84//!
85//! // Stuff to make it easier to pass around closures.
86//! trait FnBox {
87//! fn call_box(self: Box<Self>);
88//! }
89//! impl<F: FnOnce()> FnBox for F {
90//! fn call_box(self: Box<F>) {
91//! (*self)()
92//! }
93//! }
94//! type Task = Box<FnBox + Send + 'static>;
95//!
96//! // Variable used to test calling a `Task` sent between threads.
97//! let test_var = Arc::new(AtomicUsize::new(0));
98//! let test_var2 = test_var.clone();
99//! let test_var3 = test_var.clone();
100//!
101//! // Variable needed to stop `responder` thread if `requester` times out
102//! let should_exit = Arc::new(AtomicBool::new(false));
103//! let should_exit_copy_1 = should_exit.clone();
104//! let should_exit_copy_2 = should_exit.clone();
105//!
106//! let (requester, responder) = chan::channel::<Task>();
107//! let responder2 = responder.clone();
108//!
109//! // requesting thread
110//! let requester_handle = thread::spawn(move || {
111//! let start_time = Instant::now();
112//! let timeout = Duration::new(0, 1000000);
113//!
114//! let mut contract = requester.try_request().unwrap();
115//!
116//! loop {
117//! // Try to cancel request and stop threads if runtime
118//! // has exceeded `timeout`.
119//! if start_time.elapsed() >= timeout {
120//! // Try to cancel request.
121//! // This should only fail if `responder` has started responding.
122//! if contract.try_cancel() {
123//! // Notify other threads to stop.
124//! should_exit.store(true, Ordering::SeqCst);
125//! break;
126//! }
127//! }
128//!
129//! // Try getting `task` from `responder`.
130//! match contract.try_receive() {
131//! // `contract` received `task`.
132//! Ok(task) => {
133//! task.call_box();
134//! // Notify other threads to stop.
135//! should_exit.store(true, Ordering::SeqCst);
136//! break;
137//! },
138//! // Continue looping if `responder` has not yet sent `task`.
139//! Err(chan::TryReceiveError::Empty) => {},
140//! // The only other error is `chan::TryReceiveError::Done`.
141//! // This only happens if we call `contract.try_receive()`
142//! // after either receiving data or cancelling the request.
143//! _ => unreachable!(),
144//! }
145//! }
146//! });
147//!
148//! // responding thread 1
149//! let responder_1_handle = thread::spawn(move || {
150//! let mut tasks = vec![Box::new(move || {
151//! test_var2.fetch_add(1, Ordering::SeqCst);
152//! }) as Task];
153//!
154//! loop {
155//! // Exit loop if `receiver` has timed out.
156//! if should_exit_copy_1.load(Ordering::SeqCst) {
157//! break;
158//! }
159//!
160//! // Send `task` to `receiver` if it has issued a request.
161//! match responder2.try_respond() {
162//! // `responder2` can respond to request.
163//! Ok(contract) => {
164//! contract.send(tasks.pop().unwrap());
165//! break;
166//! },
167//! // Either `requester` has not yet made a request,
168//! // or `responder2` already handled the request.
169//! Err(chan::TryRespondError::NoRequest) => {},
170//! // `responder2` is processing request..
171//! Err(chan::TryRespondError::Locked) => { break; },
172//! }
173//! }
174//! });
175//!
176//! // responding thread 2
177//! let responder_2_handle = thread::spawn(move || {
178//! let mut tasks = vec![Box::new(move || {
179//! test_var3.fetch_add(2, Ordering::SeqCst);
180//! }) as Task];
181//!
182//! loop {
183//! // Exit loop if `receiver` has timed out.
184//! if should_exit_copy_2.load(Ordering::SeqCst) {
185//! break;
186//! }
187//!
188//! // Send `task` to `receiver` if it has issued a request.
189//! match responder.try_respond() {
190//! // `responder2` can respond to request.
191//! Ok(contract) => {
192//! contract.send(tasks.pop().unwrap());
193//! break;
194//! },
195//! // Either `requester` has not yet made a request,
196//! // or `responder` already handled the request.
197//! Err(chan::TryRespondError::NoRequest) => {},
198//! // `responder` is processing request.
199//! Err(chan::TryRespondError::Locked) => { break; },
200//! }
201//! }
202//! });
203//!
204//! requester_handle.join().unwrap();
205//! responder_1_handle.join().unwrap();
206//! responder_2_handle.join().unwrap();
207//!
208//! // `num` can be 0, 1 or 2.
209//! let num = test_var.load(Ordering::SeqCst);
210//! println!("Number is {}", num);
211//! ```
212
213use std::cell::UnsafeCell;
214use std::sync::Arc;
215use std::sync::atomic::{AtomicBool, Ordering};
216
217/// This function creates a `reqchan` and returns a tuple containing the
218/// two ends of this bidirectional request->response channel.
219///
220/// # Example
221///
222/// ```
223/// extern crate reqchan;
224///
225/// #[allow(unused_variables)]
226/// let (requester, responder) = reqchan::channel::<u32>();
227/// ```
228pub fn channel<T>() -> (Requester<T>, Responder<T>) {
229 let inner = Arc::new(Inner {
230 has_request_lock: AtomicBool::new(false),
231 has_response_lock: AtomicBool::new(false),
232 has_request: AtomicBool::new(false),
233 has_datum: AtomicBool::new(false),
234 datum: UnsafeCell::new(None),
235 });
236
237 (
238 Requester { inner: inner.clone() },
239 Responder { inner: inner.clone() },
240 )
241}
242
243/// This end of the channel requests and receives data from its `Responder`(s).
244pub struct Requester<T> {
245 inner: Arc<Inner<T>>,
246}
247
248impl<T> Requester<T> {
249 /// This methods tries to request item(s) from one or more `Responder`(s).
250 /// If successful, it returns a `RequestContract` to either poll for data or
251 /// cancel the request.
252 ///
253 /// # Warning
254 ///
255 /// Only **one** `RequestContract` may be active at a time.
256 ///
257 /// # Example
258 ///
259 /// ```
260 /// extern crate reqchan as chan;
261 ///
262 /// let (requester, responder) = chan::channel::<u32>();
263 ///
264 /// // Create request.
265 /// let mut request_contract = requester.try_request().unwrap();
266 ///
267 /// // We have to wait for `request_contract` to go out of scope
268 /// // before we can make another request.
269 /// // match requester.try_request() {
270 /// // Err(chan::TryRequestError::Locked) => {
271 /// // println!("We already have a request contract!");
272 /// // },
273 /// // _ => unreachable!(),
274 /// // }
275 ///
276 /// responder.try_respond().unwrap().send(5);
277 /// println!("Got number {}", request_contract.try_receive().unwrap());
278 /// ```
279 pub fn try_request(&self) -> Result<RequestContract<T>, TryRequestError> {
280 // First, try to lock the requesting side.
281 if !self.inner.try_lock_request() {
282 return Err(TryRequestError::Locked);
283 }
284
285 // Next, flag a request.
286 self.inner.flag_request();
287
288 // Then return a `RequestContract`.
289 Ok(RequestContract {
290 inner: self.inner.clone(),
291 done: false,
292 })
293 }
294}
295
296/// This is the contract returned by a successful `Requester::try_request()`.
297/// It represents the caller's exclusive access to the requesting side of
298/// the channel. The user can either try to get a datum from the responding side
299/// or *attempt* to cancel the request. To prevent data loss, `RequestContract`
300/// will panic if the user has not received a datum or cancelled the request.
301pub struct RequestContract<T> {
302 inner: Arc<Inner<T>>,
303 done: bool,
304}
305
306impl<T> RequestContract<T> {
307 /// This method attempts to receive a datum from one or more responder(s).
308 ///
309 /// # Warning
310 ///
311 /// It returns `Err(TryReceiveError::Done)` if the user called it
312 /// after either receiving a datum or cancelling the request.
313 ///
314 /// # Example
315 ///
316 /// ```
317 /// extern crate reqchan as chan;
318 ///
319 /// let (requester, responder) = chan::channel::<u32>();
320 ///
321 /// let mut request_contract = requester.try_request().unwrap();
322 ///
323 /// // The responder has not responded yet.
324 /// match request_contract.try_receive() {
325 /// Err(chan::TryReceiveError::Empty) => { println!("No Data yet!"); },
326 /// _ => unreachable!(),
327 /// }
328 ///
329 /// responder.try_respond().unwrap().send(6);
330 ///
331 /// // The responder has responded now.
332 /// match request_contract.try_receive() {
333 /// Ok(num) => { println!("Number: {}", num); },
334 /// _ => unreachable!(),
335 /// }
336 ///
337 /// // We need to issue another request to receive more data.
338 /// match request_contract.try_receive() {
339 /// Err(chan::TryReceiveError::Done) => {
340 /// println!("We already received data!");
341 /// },
342 /// _ => unreachable!(),
343 /// }
344 /// ```
345 pub fn try_receive(&mut self) -> Result<T, TryReceiveError> {
346 // Do not try to receive anything if the contract already received data.
347 if self.done {
348 return Err(TryReceiveError::Done);
349 }
350
351 match self.inner.try_get_datum() {
352 Ok(datum) => {
353 self.done = true;
354 Ok(datum)
355 },
356 Err(err) => Err(err),
357 }
358 }
359
360 /// This method attempts to cancel a request. This is useful for
361 /// implementing a timeout.
362 ///
363 /// # Return
364 ///
365 /// * `true` - Cancelled request
366 /// * `false` - `Responder` started processing request first
367 ///
368 /// # Warning
369 ///
370 /// It also returns `false` if the user called it after
371 /// either receiving a datum or cancelling the request.
372 ///
373 /// # Example
374 ///
375 /// ```
376 /// extern crate reqchan as chan;
377 ///
378 /// let (requester, responder) = chan::channel::<u32>();
379 ///
380 /// {
381 /// let mut contract = requester.try_request().unwrap();
382 ///
383 /// // We can cancel the request since `responder` has not
384 /// // yet responded to it.
385 /// assert_eq!(contract.try_cancel(), true);
386 ///
387 /// // Both contracts go out of scope here
388 /// }
389 ///
390 /// {
391 /// let mut request_contract = requester.try_request().unwrap();
392 ///
393 /// responder.try_respond().unwrap().send(7);
394 ///
395 /// // It is too late to cancel the request!
396 /// if !request_contract.try_cancel() {
397 /// println!("Number: {}", request_contract.try_receive().unwrap());
398 /// }
399 ///
400 /// // Both contracts go out of scope here
401 /// }
402 /// ```
403 pub fn try_cancel(&mut self) -> bool {
404 // Do not try to unsend if the contract already received data.
405 if self.done {
406 return false;
407 }
408
409 match self.inner.try_unflag_request() {
410 true => {
411 self.done = true;
412 true
413 },
414 false => false,
415 }
416 }
417}
418
419impl<T> Drop for RequestContract<T> {
420 fn drop(&mut self) {
421 if !self.done {
422 panic!("Dropping RequestContract without receiving data!");
423 }
424
425 self.inner.unlock_request();
426 }
427}
428
429/// This end of the channel sends data in response to requests from
430/// its `Requester`.
431pub struct Responder<T> {
432 inner: Arc<Inner<T>>,
433}
434
435impl<T> Responder<T> {
436 /// This method signals the intent of `Responder` to respond to a request.
437 /// If successful, it returns a `ResponseContract` to ensure the user sends
438 /// a datum.
439 ///
440 /// # Warning
441 ///
442 /// Only **one** `ResponseContract` may be active at a time.
443 ///
444 /// # Example
445 ///
446 /// ```
447 /// extern crate reqchan as chan;
448 ///
449 /// let (requester, responder) = chan::channel::<u32>();
450 ///
451 /// // `requester` has not yet issued a request.
452 /// match responder.try_respond() {
453 /// Err(chan::TryRespondError::NoRequest) => {
454 /// println!("There is no request!");
455 /// },
456 /// _ => unreachable!(),
457 /// }
458 ///
459 /// let mut request_contract = requester.try_request().unwrap();
460 ///
461 /// // `requester` has issued a request.
462 /// let mut response_contract = responder.try_respond().unwrap();
463 ///
464 /// // We cannot issue another response to the request.
465 /// match responder.try_respond() {
466 /// Err(chan::TryRespondError::Locked) => {
467 /// println!("We cannot issue multiple responses to a request!");
468 /// },
469 /// _ => unreachable!(),
470 /// }
471 ///
472 /// response_contract.send(8);
473 ///
474 /// println!("Number is {}", request_contract.try_receive().unwrap());
475 /// ```
476 pub fn try_respond(&self) -> Result<ResponseContract<T>,
477 TryRespondError> {
478
479 // First try to lock the responding side.
480 if !self.inner.try_lock_response() {
481 return Err(TryRespondError::Locked);
482 }
483
484 // Next, atomically check for a request and signal a response to it.
485 // If no request exists, drop the lock and return the data.
486 if !self.inner.try_unflag_request() {
487 self.inner.unlock_response();
488 return Err(TryRespondError::NoRequest);
489 }
490
491 Ok(ResponseContract {
492 inner: self.inner.clone(),
493 done: false,
494 })
495 }
496}
497
498impl<T> Clone for Responder<T> {
499 fn clone(&self) -> Self {
500 Responder {
501 inner: self.inner.clone(),
502 }
503 }
504}
505
506/// This is the contract returned by a successful `Responder::try_response()`.
507/// It represents the caller's exclusive access to the responding side of
508/// the channel. It ensures the user sends a datum by panicking if they have not.
509pub struct ResponseContract<T> {
510 inner: Arc<Inner<T>>,
511 done: bool,
512}
513
514impl<T> ResponseContract<T> {
515 /// This method tries to send a datum to the requesting end of the channel.
516 /// It will then consume itself, thereby freeing the responding side of
517 /// the channel.
518 ///
519 /// # Arguments
520 ///
521 /// * `datum` - The item(s) to send
522 ///
523 /// # Example
524 ///
525 /// ```
526 /// extern crate reqchan as chan;
527 ///
528 /// let (requester, responder) = chan::channel::<u32>();
529 ///
530 /// let mut request_contract = requester.try_request().unwrap();
531 ///
532 /// let mut response_contract = responder.try_respond().unwrap();
533 ///
534 /// // We send data to the requesting end.
535 /// response_contract.send(9);
536 ///
537 /// println!("Number is {}", request_contract.try_receive().unwrap());
538 /// ```
539 pub fn send(mut self, datum: T) {
540 self.inner.set_datum(datum);
541 self.done = true;
542 }
543}
544
545impl<T> Drop for ResponseContract<T> {
546 fn drop(&mut self) {
547 if !self.done {
548 panic!("Dropping ResponseContract without sending data!");
549 }
550
551 self.inner.unlock_response();
552 }
553}
554
555#[doc(hidden)]
556struct Inner<T> {
557 has_request_lock: AtomicBool,
558 has_response_lock: AtomicBool,
559 has_request: AtomicBool,
560 has_datum: AtomicBool,
561 datum: UnsafeCell<Option<T>>,
562}
563
564unsafe impl<T> Sync for Inner<T> {}
565
566#[doc(hidden)]
567impl<T> Inner<T> {
568 /// This method indicates that the requesting side has made a request.
569 ///
570 /// # Warning
571 ///
572 /// **ONLY** the requesting side of the channel should call it.
573 ///
574 /// # Invariant
575 ///
576 /// * self.has_request_lock == true
577 #[inline]
578 fn flag_request(&self) {
579 self.has_request.store(true, Ordering::SeqCst);
580 }
581
582 /// This method atomically checks to see if the requesting end
583 /// issued a request and unflag the request.
584 #[inline]
585 fn try_unflag_request(&self) -> bool {
586 let (old, new) = (true, false);
587
588 self.has_request.compare_and_swap(old,
589 new,
590 Ordering::SeqCst) == old
591 }
592
593 /// This method sets the inner datum to the specified value.
594 ///
595 /// # Arguments
596 ///
597 /// * datum - The datum to set
598 ///
599 /// # Warning
600 ///
601 /// **ONLY** the responding side of the channel should call it.
602 ///
603 /// # Invariant
604 ///
605 /// * self.has_response_lock == true
606 ///
607 /// * (*self.datum.get()).is_none() == true
608 ///
609 /// * self.has_datum == false
610 #[inline]
611 fn set_datum(&self, data: T) {
612 // First update inner datum.
613 unsafe {
614 *self.datum.get() = Some(data);
615 }
616
617 // Then indicate the presence of a new datum.
618 self.has_datum.store(true, Ordering::SeqCst);
619 }
620
621 /// This method tries to get the datum out of `Inner`.
622 ///
623 /// # Warning
624 ///
625 /// **ONLY** the requesting side of the channel should call it.
626 ///
627 /// # Invariant
628 ///
629 /// * self.has_request_lock == true
630 ///
631 /// * if self.has_datum == true then (*self.datum.get()).is_some() == true
632 #[inline]
633 fn try_get_datum(&self) -> Result<T, TryReceiveError> {
634 // First check to see if data exists.
635 let (old, new) = (true, false);
636
637 if self.has_datum.compare_and_swap(old,
638 new,
639 Ordering::SeqCst) == old {
640 // If so, retrieve the data and unwrap it from its Option container.
641 unsafe {
642 Ok((*self.datum.get()).take().unwrap())
643 }
644 }
645 else {
646 Err(TryReceiveError::Empty)
647 }
648 }
649
650 /// This method tries to lock the requesting side of the channel.
651 /// It returns a `boolean` indicating whether or not it succeeded.
652 #[inline]
653 fn try_lock_request(&self) -> bool {
654 let (old, new) = (false, true);
655
656 self.has_request_lock.compare_and_swap(old, new, Ordering::SeqCst) == old
657 }
658
659 /// This method unlocks the requesting side of the channel.
660 #[inline]
661 fn unlock_request(&self) {
662 self.has_request_lock.store(false, Ordering::SeqCst);
663 }
664
665 /// This method tries to lock the responding side of the channel.
666 /// It returns a `boolean` indicating whether or not it succeeded.
667 #[inline]
668 fn try_lock_response(&self) -> bool {
669 let (old, new) = (false, true);
670
671 self.has_response_lock.compare_and_swap(old, new, Ordering::SeqCst) == old
672 }
673
674 /// This method unlocks the responding side of the channel.
675 #[inline]
676 fn unlock_response(&self) {
677 self.has_response_lock.store(false, Ordering::SeqCst);
678 }
679}
680
681#[derive(Debug)]
682pub enum TryRequestError {
683 Locked,
684}
685
686#[derive(Debug)]
687pub enum TryReceiveError {
688 Empty,
689 Done,
690}
691
692#[derive(Debug)]
693pub enum TryRespondError {
694 NoRequest,
695 Locked,
696}
697
698#[cfg(test)]
699mod tests {
700 use std::sync::Arc;
701 use std::sync::atomic::{AtomicUsize, Ordering};
702
703 use super::*;
704
705 trait FnBox {
706 fn call_box(self: Box<Self>);
707 }
708
709 impl<F: FnOnce()> FnBox for F {
710 fn call_box(self: Box<F>) {
711 (*self)()
712 }
713 }
714
715 type Task = Box<FnBox + Send + 'static>;
716
717 #[test]
718 fn test_channel() {
719 #[allow(unused_variables)]
720 let (rqst, resp) = channel::<Task>();
721 }
722
723 #[test]
724 fn test_inner_try_lock_request() {
725 #[allow(unused_variables)]
726 let (rqst, resp) = channel::<Task>();
727
728 assert_eq!(rqst.inner.try_lock_request(), true);
729 assert_eq!(resp.inner.has_request_lock.load(Ordering::SeqCst), true);
730 }
731
732 #[test]
733 fn test_inner_try_lock_request_multiple() {
734 #[allow(unused_variables)]
735 let (rqst, resp) = channel::<Task>();
736
737 rqst.inner.try_lock_request();
738
739 assert_eq!(rqst.inner.try_lock_request(), false);
740 }
741
742 #[test]
743 fn test_inner_try_unlock_request() {
744 #[allow(unused_variables)]
745 let (rqst, resp) = channel::<Task>();
746
747 rqst.inner.has_request_lock.store(true, Ordering::SeqCst);
748
749 rqst.inner.unlock_request();
750
751 assert_eq!(resp.inner.has_request_lock.load(Ordering::SeqCst), false);
752 }
753
754 #[test]
755 fn test_inner_try_lock_response() {
756 #[allow(unused_variables)]
757 let (rqst, resp) = channel::<Task>();
758
759 assert_eq!(rqst.inner.try_lock_response(), true);
760 assert_eq!(resp.inner.has_response_lock.load(Ordering::SeqCst), true);
761 }
762
763 #[test]
764 fn test_inner_try_lock_response_multiple() {
765 #[allow(unused_variables)]
766 let (rqst, resp) = channel::<Task>();
767
768 rqst.inner.try_lock_response();
769
770 assert_eq!(rqst.inner.try_lock_response(), false);
771 }
772
773 #[test]
774 fn test_inner_try_unlock_response() {
775 #[allow(unused_variables)]
776 let (rqst, resp) = channel::<Task>();
777
778 rqst.inner.has_response_lock.store(true, Ordering::SeqCst);
779
780 rqst.inner.unlock_response();
781
782 assert_eq!(resp.inner.has_response_lock.load(Ordering::SeqCst), false);
783 }
784
785 #[test]
786 fn test_inner_flag_request() {
787 #[allow(unused_variables)]
788 let (rqst, resp) = channel::<Task>();
789
790 rqst.inner.flag_request();
791
792 assert_eq!(resp.inner.has_request.load(Ordering::SeqCst), true);
793 }
794
795 #[test]
796 fn test_inner_try_unflag_request() {
797 #[allow(unused_variables)]
798 let (rqst, resp) = channel::<Task>();
799
800 resp.inner.has_request.store(true, Ordering::SeqCst);
801
802 assert_eq!(rqst.inner.try_unflag_request(), true);
803 assert_eq!(resp.inner.has_request.load(Ordering::SeqCst), false);
804
805 assert_eq!(rqst.inner.try_unflag_request(), false);
806 }
807
808 #[test]
809 fn test_inner_try_unflag_request_multiple() {
810 #[allow(unused_variables)]
811 let (rqst, resp) = channel::<Task>();
812
813 resp.inner.has_request.store(true, Ordering::SeqCst);
814
815 rqst.inner.try_unflag_request();
816
817 assert_eq!(rqst.inner.try_unflag_request(), false);
818 }
819
820 #[test]
821 fn test_inner_set_datum() {
822 #[allow(unused_variables)]
823 let (rqst, resp) = channel::<Task>();
824
825 let task = Box::new(move || { println!("Hello World!"); }) as Task;
826
827 resp.inner.set_datum(task);
828
829 assert_eq!(resp.inner.has_datum.load(Ordering::SeqCst), true);
830 }
831
832 #[test]
833 fn test_inner_try_get_datum_with_data() {
834 #[allow(unused_variables)]
835 let (rqst, resp) = channel::<Task>();
836
837 let var = Arc::new(AtomicUsize::new(0));
838 let var2 = var.clone();
839
840 let task = Box::new(move || {
841 var2.fetch_add(1, Ordering::SeqCst);
842 }) as Task;
843
844 unsafe {
845 *resp.inner.datum.get() = Some(task);
846 }
847 resp.inner.has_datum.store(true, Ordering::SeqCst);
848
849 match rqst.inner.try_get_datum() {
850 Ok(t) => {
851 t.call_box();
852 assert_eq!(var.load(Ordering::SeqCst), 1);
853 },
854 _ => { assert!(false); },
855 }
856 }
857
858 #[test]
859 fn test_inner_try_get_datum_no_data() {
860 #[allow(unused_variables)]
861 let (rqst, resp) = channel::<Task>();
862
863 match rqst.inner.try_get_datum() {
864 Err(TryReceiveError::Empty) => {}
865 _ => { assert!(false); },
866 }
867 }
868
869 #[test]
870 fn test_requester_try_request() {
871 #[allow(unused_variables)]
872 let (rqst, resp) = channel::<Task>();
873
874 let mut contract = rqst.try_request().unwrap();
875
876 contract.done = true;
877 }
878
879 #[test]
880 fn test_requester_try_request_multiple() {
881 #[allow(unused_variables)]
882 let (rqst, resp) = channel::<Task>();
883
884 rqst.inner.try_lock_request();
885
886 match rqst.try_request() {
887 Err(TryRequestError::Locked) => {},
888 _ => { assert!(false); },
889 }
890 }
891
892 #[test]
893 fn test_request_contract_try_receive() {
894 let (rqst, resp) = channel::<Task>();
895
896 let var = Arc::new(AtomicUsize::new(0));
897 let var2 = var.clone();
898
899 let task = Box::new(move || {
900 var2.fetch_add(1, Ordering::SeqCst);
901 }) as Task;
902
903 let mut contract = rqst.try_request().unwrap();
904
905 resp.inner.set_datum(task);
906
907 match contract.try_receive() {
908 Ok(task) => {
909 task.call_box();
910 },
911 _ => { assert!(false); },
912 }
913
914 assert_eq!(contract.done, true);
915 assert_eq!(var.load(Ordering::SeqCst), 1);
916 }
917
918 #[test]
919 fn test_request_contract_try_receive_no_data() {
920 #[allow(unused_variables)]
921 let (rqst, resp) = channel::<Task>();
922
923 let mut contract = rqst.try_request().unwrap();
924
925 match contract.try_receive() {
926 Err(TryReceiveError::Empty) => {},
927 _ => { assert!(false); },
928 }
929
930 assert_eq!(contract.done, false);
931
932 contract.done = true;
933 }
934
935 #[test]
936 fn test_request_contract_try_receive_done() {
937 #[allow(unused_variables)]
938 let (rqst, resp) = channel::<Task>();
939
940 let mut contract = rqst.try_request().unwrap();
941
942 contract.done = true;
943
944 match contract.try_receive() {
945 Err(TryReceiveError::Done) => {},
946 _ => { assert!(false); },
947 }
948 }
949
950 #[test]
951 fn test_request_contract_try_cancel() {
952 #[allow(unused_variables)]
953 let (rqst, resp) = channel::<Task>();
954
955 let mut contract = rqst.try_request().unwrap();
956
957 assert_eq!(contract.try_cancel(), true);
958 }
959
960 #[test]
961 fn test_request_contract_try_cancel_too_late() {
962 #[allow(unused_variables)]
963 let (rqst, resp) = channel::<Task>();
964
965 let mut contract = rqst.try_request().unwrap();
966
967 rqst.inner.try_unflag_request();
968
969 assert_eq!(contract.try_cancel(), false);
970 assert_eq!(contract.done, false);
971
972 contract.done = true;
973 }
974
975 #[test]
976 fn test_request_contract_try_cancel_done() {
977 #[allow(unused_variables)]
978 let (rqst, resp) = channel::<Task>();
979
980 let mut contract = rqst.try_request().unwrap();
981
982 contract.done = true;
983
984 assert_eq!(contract.try_cancel(), false);
985 }
986
987 #[test]
988 #[should_panic]
989 fn test_request_contract_drop_without_receiving_data() {
990 #[allow(unused_variables)]
991 let (rqst, resp) = channel::<Task>();
992
993 #[allow(unused_variables)]
994 let contract = rqst.try_request().unwrap();
995 }
996
997 #[test]
998 fn test_responder_try_respond() {
999 let (rqst, resp) = channel::<Task>();
1000
1001 rqst.inner.flag_request();
1002
1003 let mut contract = resp.try_respond().unwrap();
1004
1005 contract.done = true;
1006 }
1007
1008 #[test]
1009 fn test_responder_try_respond_no_request() {
1010 #[allow(unused_variables)]
1011 let (rqst, resp) = channel::<Task>();
1012
1013 match resp.try_respond() {
1014 Err(TryRespondError::NoRequest) => {},
1015 _ => { assert!(false); },
1016 }
1017 }
1018
1019 #[test]
1020 fn test_responder_try_respond_multiple() {
1021 #[allow(unused_variables)]
1022 let (rqst, resp) = channel::<Task>();
1023
1024 resp.inner.try_lock_response();
1025
1026 match resp.try_respond() {
1027 Err(TryRespondError::Locked) => {},
1028 _ => { assert!(false); },
1029 }
1030 }
1031
1032 #[test]
1033 fn test_response_contract_send() {
1034 let (rqst, resp) = channel::<Task>();
1035
1036 rqst.inner.flag_request();
1037
1038 let contract = resp.try_respond().unwrap();
1039
1040 contract.send(Box::new(move || { println!("Hello World!"); }) as Task);
1041 }
1042
1043 #[test]
1044 #[should_panic]
1045 fn test_response_contract_drop_without_sending_data() {
1046 #[allow(unused_variables)]
1047 let (rqst, resp) = channel::<Task>();
1048
1049 #[allow(unused_variables)]
1050 let contract = resp.try_respond().unwrap();
1051 }
1052}