reqchan/
lib.rs

1//! This crate defines a channel for requesting and receiving data. Each
2//! channel has only one requesting end, but it can have multiple responding
3//! ends. It is useful for implementing work sharing.
4//!
5//! The two ends of the channel are asynchronous with respect to each other,
6//! so it is kinda nonblocking. However, if multiple responding ends try to 
7//! respond to the same request, only one will succeed; the rest will
8//! return errors.  
9//!
10//! # Design
11//!
12//! ## Overview
13//!
14//! `reqchan` is built around the two halves of the channel: `Requester`
15//! and `Responder`. Both implement methods, `Requester::try_request()` and
16//! `Responder::try_respond()`, that, when succesful, lock their corresponding
17//! side of the channel and return contracts. `RequestContract` **requires** the
18//! user to either successfully receive a datum or cancel the request.
19//! `ResponseContract` requires the user to send a datum. These requirements
20//! prevent the system from losing data sent through the channel.
21//!
22//! ## Locking 
23//!
24//! `Responder::try_response()` locks the responding side to prevent other
25//! potential responders from responding to the same request. However,
26//! `Requester::try_request()` locks the requesting side of the channel
27//! to prevent the user from trying to issue multiple outstanding requests.
28//! Both locks are dropped when their corresponding contract object is dropped.
29//!
30//! ## Contracts 
31//!
32//! `Requester::try_request()` has to issue a `RequestContract` so the
33//! thread of execution does not block waiting for a response. However,
34//! that reason does not apply to `Responder::try_response()`. I originally
35//! made `Responder::try_response()` send the datum. However, that required
36//! the user to have the datum available to send even if it could not be sent,
37//! and it required the user to handle the returned datum if it could not be
38//! sent. If the datum was, say, half the contents of a `Vec`, this might entail
39//! lots of expensive memory allocation. Therefore, I made `Responder::try_response()`
40//! return a `ResponseContract` indicating that the responder *could* and *would*
41//! respond to the request. This way the user only has to perform the necessary
42//! steps to send the datum if the datum must be sent.
43//!
44//! # Examples 
45//! 
46//! ## Simple Example
47//! 
48//! This simple, single-threaded example demonstrates most of the API.
49//! The only thing left out is `RequestContract::try_cancel()`.
50//!
51//! ```
52//! extern crate reqchan;
53//!
54//! // Create channel.
55//! let (requester, responder) = reqchan::channel::<u32>(); 
56//!
57//! // Issue request.
58//! let mut request_contract = requester.try_request().unwrap();
59//!
60//! // Respond with number.
61//! responder.try_respond().unwrap().send(5);
62//!
63//! // Receive and print number.
64//! println!("Number is {}", request_contract.try_receive().unwrap());
65//! ```
66//!
67//! ## More Complex Example 
68//!
69//! This more complex example demonstrates more "real-world" usage.
70//! One thread requests a 'task' (i.e. a closure to run), and the
71//! other two threads fight over who gets to respond with their
72//! own personal task. Meanwhile, the requesting thread is polling
73//! for a task, and if it gets one in time, it runs it. Regardless of
74//! whether or not the receiver got a task or timed out, the receiver
75//! notifies other threads to stop running, and stops itself.
76//!
77//! ```
78//! extern crate reqchan as chan;
79//! 
80//! use std::sync::Arc;
81//! use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
82//! use std::thread;
83//! use std::time::{Duration, Instant};
84//! 
85//! // Stuff to make it easier to pass around closures.
86//! trait FnBox {
87//!     fn call_box(self: Box<Self>);
88//! }
89//! impl<F: FnOnce()> FnBox for F {
90//!     fn call_box(self: Box<F>) {
91//!         (*self)()
92//!     }
93//! }
94//! type Task = Box<FnBox + Send + 'static>;
95//! 
96//! // Variable used to test calling a `Task` sent between threads.
97//! let test_var = Arc::new(AtomicUsize::new(0));
98//! let test_var2 = test_var.clone();
99//! let test_var3 = test_var.clone();
100//! 
101//! // Variable needed to stop `responder` thread if `requester` times out
102//! let should_exit = Arc::new(AtomicBool::new(false));
103//! let should_exit_copy_1 = should_exit.clone();
104//! let should_exit_copy_2 = should_exit.clone();
105//! 
106//! let (requester, responder) = chan::channel::<Task>();
107//! let responder2 = responder.clone();
108//! 
109//! // requesting thread
110//! let requester_handle = thread::spawn(move || {
111//!     let start_time = Instant::now();
112//!     let timeout = Duration::new(0, 1000000);
113//!     
114//!     let mut contract = requester.try_request().unwrap();
115//! 
116//!     loop {
117//!         // Try to cancel request and stop threads if runtime
118//!         // has exceeded `timeout`.
119//!         if start_time.elapsed() >= timeout {
120//!             // Try to cancel request.
121//!             // This should only fail if `responder` has started responding.
122//!             if contract.try_cancel() {
123//!                 // Notify other threads to stop.
124//!                 should_exit.store(true, Ordering::SeqCst);
125//!                 break;
126//!             }
127//!         }
128//! 
129//!         // Try getting `task` from `responder`.
130//!         match contract.try_receive() {
131//!             // `contract` received `task`.
132//!             Ok(task) => {
133//!                 task.call_box();
134//!                 // Notify other threads to stop.
135//!                 should_exit.store(true, Ordering::SeqCst);
136//!                 break;
137//!             },
138//!             // Continue looping if `responder` has not yet sent `task`.
139//!             Err(chan::TryReceiveError::Empty) => {},
140//!             // The only other error is `chan::TryReceiveError::Done`.
141//!             // This only happens if we call `contract.try_receive()`
142//!             // after either receiving data or cancelling the request.
143//!             _ => unreachable!(),
144//!         }
145//!     }
146//! });
147//! 
148//! // responding thread 1
149//! let responder_1_handle = thread::spawn(move || {
150//!     let mut tasks = vec![Box::new(move || {
151//!         test_var2.fetch_add(1, Ordering::SeqCst);
152//!     }) as Task];
153//!     
154//!     loop {
155//!         // Exit loop if `receiver` has timed out.
156//!         if should_exit_copy_1.load(Ordering::SeqCst) {
157//!             break;
158//!         }
159//!         
160//!         // Send `task` to `receiver` if it has issued a request.
161//!         match responder2.try_respond() {
162//!             // `responder2` can respond to request.
163//!             Ok(contract) => {
164//!                 contract.send(tasks.pop().unwrap());
165//!                 break;
166//!             },
167//!             // Either `requester` has not yet made a request,
168//!             // or `responder2` already handled the request.
169//!             Err(chan::TryRespondError::NoRequest) => {},
170//!             // `responder2` is processing request..
171//!             Err(chan::TryRespondError::Locked) => { break; },
172//!         }
173//!     }
174//! });
175//! 
176//! // responding thread 2
177//! let responder_2_handle = thread::spawn(move || {
178//!     let mut tasks = vec![Box::new(move || {
179//!         test_var3.fetch_add(2, Ordering::SeqCst);
180//!     }) as Task];
181//!     
182//!     loop {
183//!         // Exit loop if `receiver` has timed out.
184//!         if should_exit_copy_2.load(Ordering::SeqCst) {
185//!             break;
186//!         }
187//!         
188//!         // Send `task` to `receiver` if it has issued a request.
189//!         match responder.try_respond() {
190//!             // `responder2` can respond to request.
191//!             Ok(contract) => {
192//!                 contract.send(tasks.pop().unwrap());
193//!                 break;
194//!             },
195//!             // Either `requester` has not yet made a request,
196//!             // or `responder` already handled the request.
197//!             Err(chan::TryRespondError::NoRequest) => {},
198//!             // `responder` is processing request.
199//!             Err(chan::TryRespondError::Locked) => { break; },
200//!         }
201//!     }
202//! });
203//! 
204//! requester_handle.join().unwrap();
205//! responder_1_handle.join().unwrap();
206//! responder_2_handle.join().unwrap();
207//!
208//! // `num` can be 0, 1 or 2.
209//! let num = test_var.load(Ordering::SeqCst);
210//! println!("Number is {}", num);
211//! ```
212
213use std::cell::UnsafeCell;
214use std::sync::Arc;
215use std::sync::atomic::{AtomicBool, Ordering};
216
217/// This function creates a `reqchan` and returns a tuple containing the
218/// two ends of this bidirectional request->response channel.
219///
220/// # Example
221/// 
222/// ```
223/// extern crate reqchan;
224///
225/// #[allow(unused_variables)]
226/// let (requester, responder) = reqchan::channel::<u32>(); 
227/// ```
228pub fn channel<T>() -> (Requester<T>, Responder<T>) {
229    let inner = Arc::new(Inner {
230        has_request_lock: AtomicBool::new(false),
231        has_response_lock: AtomicBool::new(false),
232        has_request: AtomicBool::new(false),
233        has_datum: AtomicBool::new(false),
234        datum: UnsafeCell::new(None),
235    });
236
237    (
238        Requester { inner: inner.clone() },
239        Responder { inner: inner.clone() },
240    )
241}
242
243/// This end of the channel requests and receives data from its `Responder`(s).
244pub struct Requester<T> {
245    inner: Arc<Inner<T>>,
246}
247
248impl<T> Requester<T> {
249    /// This methods tries to request item(s) from one or more `Responder`(s).
250    /// If successful, it returns a `RequestContract` to either poll for data or
251    /// cancel the request.
252    ///
253    /// # Warning
254    ///
255    /// Only **one** `RequestContract` may be active at a time.
256    ///
257    /// # Example
258    /// 
259    /// ```
260    /// extern crate reqchan as chan;
261    ///
262    /// let (requester, responder) = chan::channel::<u32>(); 
263    ///
264    /// // Create request.
265    /// let mut request_contract = requester.try_request().unwrap();
266    /// 
267    /// // We have to wait for `request_contract` to go out of scope
268    /// // before we can make another request.
269    /// // match requester.try_request() {
270    /// //     Err(chan::TryRequestError::Locked) => {
271    /// //         println!("We already have a request contract!");
272    /// //     },
273    /// //     _ => unreachable!(),
274    /// // }
275    ///
276    /// responder.try_respond().unwrap().send(5);
277    /// println!("Got number {}", request_contract.try_receive().unwrap());
278    /// ```
279    pub fn try_request(&self) -> Result<RequestContract<T>, TryRequestError> {
280        // First, try to lock the requesting side.
281        if !self.inner.try_lock_request() {
282            return Err(TryRequestError::Locked);
283        }
284
285        // Next, flag a request.
286        self.inner.flag_request();
287
288        // Then return a `RequestContract`.
289        Ok(RequestContract {
290            inner: self.inner.clone(),
291            done: false,
292        })
293    }
294}
295
296/// This is the contract returned by a successful `Requester::try_request()`.
297/// It represents the caller's exclusive access to the requesting side of
298/// the channel. The user can either try to get a datum from the responding side
299/// or *attempt* to cancel the request. To prevent data loss, `RequestContract`
300/// will panic if the user has not received a datum or cancelled the request.
301pub struct RequestContract<T> {
302    inner: Arc<Inner<T>>,
303    done: bool,
304}
305
306impl<T> RequestContract<T> {
307    /// This method attempts to receive a datum from one or more responder(s).
308    ///
309    /// # Warning
310    ///
311    /// It returns `Err(TryReceiveError::Done)` if the user called it
312    /// after either receiving a datum or cancelling the request.
313    ///
314    /// # Example
315    /// 
316    /// ```
317    /// extern crate reqchan as chan;
318    ///
319    /// let (requester, responder) = chan::channel::<u32>(); 
320    ///
321    /// let mut request_contract = requester.try_request().unwrap();
322    ///
323    /// // The responder has not responded yet. 
324    /// match request_contract.try_receive() {
325    ///     Err(chan::TryReceiveError::Empty) => { println!("No Data yet!"); },
326    ///     _ => unreachable!(),
327    /// }
328    /// 
329    /// responder.try_respond().unwrap().send(6);
330    /// 
331    /// // The responder has responded now.
332    /// match request_contract.try_receive() {
333    ///     Ok(num) => { println!("Number: {}", num); },
334    ///     _ => unreachable!(),
335    /// }
336    ///
337    /// // We need to issue another request to receive more data.
338    /// match request_contract.try_receive() {
339    ///     Err(chan::TryReceiveError::Done) => {
340    ///         println!("We already received data!");
341    ///     },
342    ///     _ => unreachable!(),
343    /// }
344    /// ```
345    pub fn try_receive(&mut self) -> Result<T, TryReceiveError> {
346        // Do not try to receive anything if the contract already received data.
347        if self.done {
348            return Err(TryReceiveError::Done);
349        }
350
351        match self.inner.try_get_datum() {
352            Ok(datum) => {
353                self.done = true;
354                Ok(datum)
355            },
356            Err(err) => Err(err),
357        }
358    } 
359
360    /// This method attempts to cancel a request. This is useful for
361    /// implementing a timeout.
362    ///
363    /// # Return
364    ///
365    /// * `true` - Cancelled request
366    /// * `false` - `Responder` started processing request first
367    ///
368    /// # Warning
369    ///
370    /// It also returns `false` if the user called it after
371    /// either receiving a datum or cancelling the request.
372    ///
373    /// # Example
374    /// 
375    /// ```
376    /// extern crate reqchan as chan;
377    ///
378    /// let (requester, responder) = chan::channel::<u32>(); 
379    ///
380    /// {
381    ///     let mut contract = requester.try_request().unwrap();
382    ///
383    ///     // We can cancel the request since `responder` has not
384    ///     // yet responded to it.
385    ///     assert_eq!(contract.try_cancel(), true);
386    ///
387    ///     // Both contracts go out of scope here
388    /// }
389    ///
390    /// {
391    ///     let mut request_contract = requester.try_request().unwrap();
392    ///
393    ///     responder.try_respond().unwrap().send(7);
394    ///
395    ///     // It is too late to cancel the request!
396    ///     if !request_contract.try_cancel() {
397    ///         println!("Number: {}", request_contract.try_receive().unwrap());
398    ///     }
399    ///
400    ///     // Both contracts go out of scope here
401    /// }
402    /// ```
403    pub fn try_cancel(&mut self) -> bool {
404        // Do not try to unsend if the contract already received data.
405        if self.done {
406            return false;
407        }
408
409        match self.inner.try_unflag_request() {
410            true => {
411                self.done = true;
412                true
413            },
414            false => false,
415        }
416    }
417}
418
419impl<T> Drop for RequestContract<T> {
420    fn drop(&mut self) {
421        if !self.done {
422            panic!("Dropping RequestContract without receiving data!");
423        }
424
425        self.inner.unlock_request();
426    }
427}
428
429/// This end of the channel sends data in response to requests from
430/// its `Requester`.
431pub struct Responder<T> {
432    inner: Arc<Inner<T>>,
433}
434
435impl<T> Responder<T> {
436    /// This method signals the intent of `Responder` to respond to a request.
437    /// If successful, it returns a `ResponseContract` to ensure the user sends
438    /// a datum.
439    ///
440    /// # Warning
441    ///
442    /// Only **one** `ResponseContract` may be active at a time.
443    ///
444    /// # Example
445    /// 
446    /// ```
447    /// extern crate reqchan as chan;
448    ///
449    /// let (requester, responder) = chan::channel::<u32>(); 
450    ///
451    /// // `requester` has not yet issued a request.
452    /// match responder.try_respond() {
453    ///     Err(chan::TryRespondError::NoRequest) => {
454    ///         println!("There is no request!");
455    ///     },
456    ///     _ => unreachable!(),
457    /// }
458    ///
459    /// let mut request_contract = requester.try_request().unwrap();
460    ///
461    /// // `requester` has issued a request.
462    /// let mut response_contract = responder.try_respond().unwrap();
463    ///
464    /// // We cannot issue another response to the request.
465    /// match responder.try_respond() {
466    ///     Err(chan::TryRespondError::Locked) => {
467    ///         println!("We cannot issue multiple responses to a request!");
468    ///     },
469    ///     _ => unreachable!(),
470    /// }
471    ///
472    /// response_contract.send(8);
473    /// 
474    /// println!("Number is {}", request_contract.try_receive().unwrap());
475    /// ```
476    pub fn try_respond(&self) -> Result<ResponseContract<T>,
477                                        TryRespondError> {
478
479        // First try to lock the responding side.
480        if !self.inner.try_lock_response() {
481            return Err(TryRespondError::Locked);
482        }
483        
484        // Next, atomically check for a request and signal a response to it.
485        // If no request exists, drop the lock and return the data.
486        if !self.inner.try_unflag_request() {
487            self.inner.unlock_response();
488            return Err(TryRespondError::NoRequest);
489        }
490     
491        Ok(ResponseContract {
492            inner: self.inner.clone(),
493            done: false,
494        })
495    }
496}
497
498impl<T> Clone for Responder<T> {
499    fn clone(&self) -> Self {
500        Responder {
501            inner: self.inner.clone(),
502        }
503    }
504}
505
506/// This is the contract returned by a successful `Responder::try_response()`.
507/// It represents the caller's exclusive access to the responding side of
508/// the channel. It ensures the user sends a datum by panicking if they have not.
509pub struct ResponseContract<T> {
510    inner: Arc<Inner<T>>,
511    done: bool,
512}
513
514impl<T> ResponseContract<T> {
515    /// This method tries to send a datum to the requesting end of the channel.
516    /// It will then consume itself, thereby freeing the responding side of
517    /// the channel.
518    ///
519    /// # Arguments
520    ///
521    /// * `datum` - The item(s) to send
522    ///
523    /// # Example
524    /// 
525    /// ```
526    /// extern crate reqchan as chan;
527    ///
528    /// let (requester, responder) = chan::channel::<u32>(); 
529    ///
530    /// let mut request_contract = requester.try_request().unwrap();
531    ///
532    /// let mut response_contract = responder.try_respond().unwrap();
533    ///
534    /// // We send data to the requesting end.
535    /// response_contract.send(9);
536    ///
537    /// println!("Number is {}", request_contract.try_receive().unwrap());
538    /// ```
539    pub fn send(mut self, datum: T) {
540        self.inner.set_datum(datum);
541        self.done = true;
542    }
543}
544
545impl<T> Drop for ResponseContract<T> {
546    fn drop(&mut self) {
547        if !self.done {
548            panic!("Dropping ResponseContract without sending data!");
549        }
550
551        self.inner.unlock_response();
552    }
553}
554
555#[doc(hidden)]
556struct Inner<T> {
557    has_request_lock: AtomicBool,
558    has_response_lock: AtomicBool,
559    has_request: AtomicBool,
560    has_datum: AtomicBool,
561    datum: UnsafeCell<Option<T>>,
562}
563
564unsafe impl<T> Sync for Inner<T> {}
565
566#[doc(hidden)]
567impl<T> Inner<T> {
568    /// This method indicates that the requesting side has made a request.
569    ///
570    /// # Warning
571    ///
572    /// **ONLY** the requesting side of the channel should call it.
573    ///
574    /// # Invariant
575    ///
576    /// * self.has_request_lock == true
577    #[inline]
578    fn flag_request(&self) {
579        self.has_request.store(true, Ordering::SeqCst);
580    }
581
582    /// This method atomically checks to see if the requesting end
583    /// issued a request and unflag the request.
584    #[inline]
585    fn try_unflag_request(&self) -> bool {
586        let (old, new) = (true, false);
587
588        self.has_request.compare_and_swap(old,
589                                          new,
590                                          Ordering::SeqCst) == old
591    }
592
593    /// This method sets the inner datum to the specified value.
594    ///
595    /// # Arguments
596    ///
597    /// * datum - The datum to set
598    ///
599    /// # Warning
600    ///
601    /// **ONLY** the responding side of the channel should call it.
602    ///
603    /// # Invariant
604    ///
605    /// * self.has_response_lock == true
606    ///
607    /// * (*self.datum.get()).is_none() == true
608    ///
609    /// * self.has_datum == false
610    #[inline]
611    fn set_datum(&self, data: T) {
612        // First update inner datum.
613        unsafe {
614            *self.datum.get() = Some(data);
615        }
616
617        // Then indicate the presence of a new datum.
618        self.has_datum.store(true, Ordering::SeqCst);
619    }
620    
621    /// This method tries to get the datum out of `Inner`.
622    ///
623    /// # Warning
624    ///
625    /// **ONLY** the requesting side of the channel should call it.
626    ///
627    /// # Invariant
628    ///
629    /// * self.has_request_lock == true
630    ///
631    /// * if self.has_datum == true then (*self.datum.get()).is_some() == true
632    #[inline]
633    fn try_get_datum(&self) -> Result<T, TryReceiveError> {
634        // First check to see if data exists.
635        let (old, new) = (true, false);
636
637        if self.has_datum.compare_and_swap(old,
638                                           new,
639                                           Ordering::SeqCst) == old {
640            // If so, retrieve the data and unwrap it from its Option container.
641            unsafe {
642                Ok((*self.datum.get()).take().unwrap())
643            }
644        }
645        else {
646            Err(TryReceiveError::Empty)
647        }
648    }
649
650    /// This method tries to lock the requesting side of the channel.
651    /// It returns a `boolean` indicating whether or not it succeeded.
652    #[inline]
653    fn try_lock_request(&self) -> bool {
654        let (old, new) = (false, true);
655
656        self.has_request_lock.compare_and_swap(old, new, Ordering::SeqCst) == old
657    }
658
659    /// This method unlocks the requesting side of the channel.
660    #[inline]
661    fn unlock_request(&self) {
662        self.has_request_lock.store(false, Ordering::SeqCst);
663    }
664
665    /// This method tries to lock the responding side of the channel.
666    /// It returns a `boolean` indicating whether or not it succeeded.
667    #[inline]
668    fn try_lock_response(&self) -> bool {
669        let (old, new) = (false, true);
670
671        self.has_response_lock.compare_and_swap(old, new, Ordering::SeqCst) == old
672    }
673
674    /// This method unlocks the responding side of the channel.
675    #[inline]
676    fn unlock_response(&self) {
677        self.has_response_lock.store(false, Ordering::SeqCst);
678    }
679}
680
681#[derive(Debug)]
682pub enum TryRequestError {
683    Locked,
684}
685
686#[derive(Debug)]
687pub enum TryReceiveError {
688    Empty,
689    Done,
690}
691
692#[derive(Debug)]
693pub enum TryRespondError {
694    NoRequest,
695    Locked,
696}
697
698#[cfg(test)]
699mod tests {
700    use std::sync::Arc;
701    use std::sync::atomic::{AtomicUsize, Ordering};
702    
703    use super::*;
704
705    trait FnBox {
706        fn call_box(self: Box<Self>);
707    }
708
709    impl<F: FnOnce()> FnBox for F {
710        fn call_box(self: Box<F>) {
711            (*self)()
712        }
713    }
714
715    type Task = Box<FnBox + Send + 'static>;
716  
717    #[test]
718    fn test_channel() {
719        #[allow(unused_variables)]
720        let (rqst, resp) = channel::<Task>();
721    }
722   
723    #[test]
724    fn test_inner_try_lock_request() {
725        #[allow(unused_variables)]
726        let (rqst, resp) = channel::<Task>();
727
728        assert_eq!(rqst.inner.try_lock_request(), true);
729        assert_eq!(resp.inner.has_request_lock.load(Ordering::SeqCst), true);
730    }
731       
732    #[test]
733    fn test_inner_try_lock_request_multiple() {
734        #[allow(unused_variables)]
735        let (rqst, resp) = channel::<Task>();
736
737        rqst.inner.try_lock_request();
738
739        assert_eq!(rqst.inner.try_lock_request(), false);
740    }
741
742    #[test]
743    fn test_inner_try_unlock_request() {
744        #[allow(unused_variables)]
745        let (rqst, resp) = channel::<Task>();
746
747        rqst.inner.has_request_lock.store(true, Ordering::SeqCst);
748
749        rqst.inner.unlock_request();
750        
751        assert_eq!(resp.inner.has_request_lock.load(Ordering::SeqCst), false);
752    }
753      
754    #[test]
755    fn test_inner_try_lock_response() {
756        #[allow(unused_variables)]
757        let (rqst, resp) = channel::<Task>();
758
759        assert_eq!(rqst.inner.try_lock_response(), true);
760        assert_eq!(resp.inner.has_response_lock.load(Ordering::SeqCst), true);
761    }
762       
763    #[test]
764    fn test_inner_try_lock_response_multiple() {
765        #[allow(unused_variables)]
766        let (rqst, resp) = channel::<Task>();
767
768        rqst.inner.try_lock_response();
769
770        assert_eq!(rqst.inner.try_lock_response(), false);
771    }
772
773    #[test]
774    fn test_inner_try_unlock_response() {
775        #[allow(unused_variables)]
776        let (rqst, resp) = channel::<Task>();
777
778        rqst.inner.has_response_lock.store(true, Ordering::SeqCst);
779
780        rqst.inner.unlock_response();
781
782        assert_eq!(resp.inner.has_response_lock.load(Ordering::SeqCst), false);
783    }
784
785    #[test]
786    fn test_inner_flag_request() {
787        #[allow(unused_variables)]
788        let (rqst, resp) = channel::<Task>();
789
790        rqst.inner.flag_request();
791
792        assert_eq!(resp.inner.has_request.load(Ordering::SeqCst), true);
793    }
794
795    #[test]
796    fn test_inner_try_unflag_request() {
797        #[allow(unused_variables)]
798        let (rqst, resp) = channel::<Task>();
799
800        resp.inner.has_request.store(true, Ordering::SeqCst);
801
802        assert_eq!(rqst.inner.try_unflag_request(), true);
803        assert_eq!(resp.inner.has_request.load(Ordering::SeqCst), false);
804
805        assert_eq!(rqst.inner.try_unflag_request(), false);
806    }
807   
808    #[test]
809    fn test_inner_try_unflag_request_multiple() {
810        #[allow(unused_variables)]
811        let (rqst, resp) = channel::<Task>();
812
813        resp.inner.has_request.store(true, Ordering::SeqCst);
814
815        rqst.inner.try_unflag_request();
816
817        assert_eq!(rqst.inner.try_unflag_request(), false);
818    }
819
820    #[test]
821    fn test_inner_set_datum() {
822        #[allow(unused_variables)]
823        let (rqst, resp) = channel::<Task>();
824
825        let task = Box::new(move || { println!("Hello World!"); }) as Task;
826
827        resp.inner.set_datum(task);
828
829        assert_eq!(resp.inner.has_datum.load(Ordering::SeqCst), true);
830    }
831  
832    #[test]
833    fn test_inner_try_get_datum_with_data() {
834        #[allow(unused_variables)]
835        let (rqst, resp) = channel::<Task>();
836
837        let var = Arc::new(AtomicUsize::new(0));
838        let var2 = var.clone();
839        
840        let task = Box::new(move || {
841            var2.fetch_add(1, Ordering::SeqCst);
842        }) as Task;
843
844        unsafe {
845            *resp.inner.datum.get() = Some(task);
846        }
847        resp.inner.has_datum.store(true, Ordering::SeqCst);
848             
849        match rqst.inner.try_get_datum() {
850            Ok(t) => {
851                t.call_box();
852                assert_eq!(var.load(Ordering::SeqCst), 1);
853            },
854            _ => { assert!(false); },
855        }
856    }
857 
858    #[test]
859    fn test_inner_try_get_datum_no_data() {
860        #[allow(unused_variables)]
861        let (rqst, resp) = channel::<Task>();
862        
863        match rqst.inner.try_get_datum() {
864            Err(TryReceiveError::Empty) => {}
865            _ => { assert!(false); },
866        }
867    }
868
869    #[test]
870    fn test_requester_try_request() {
871        #[allow(unused_variables)]
872        let (rqst, resp) = channel::<Task>();
873
874        let mut contract = rqst.try_request().unwrap();
875
876        contract.done = true;
877    }
878
879    #[test]
880    fn test_requester_try_request_multiple() {
881        #[allow(unused_variables)]
882        let (rqst, resp) = channel::<Task>();
883
884        rqst.inner.try_lock_request();
885
886        match rqst.try_request() {
887            Err(TryRequestError::Locked) => {},
888            _ => { assert!(false); },
889        }
890    }
891
892    #[test]
893    fn test_request_contract_try_receive() {
894        let (rqst, resp) = channel::<Task>();
895
896        let var = Arc::new(AtomicUsize::new(0));
897        let var2 = var.clone();
898
899        let task = Box::new(move || {
900            var2.fetch_add(1, Ordering::SeqCst);
901        }) as Task;
902
903        let mut contract = rqst.try_request().unwrap();
904
905        resp.inner.set_datum(task);
906
907        match contract.try_receive() {
908            Ok(task) => {
909                task.call_box();
910            },
911            _ => { assert!(false); },
912        }
913
914        assert_eq!(contract.done, true);
915        assert_eq!(var.load(Ordering::SeqCst), 1);
916    }
917
918    #[test]
919    fn test_request_contract_try_receive_no_data() {
920        #[allow(unused_variables)]
921        let (rqst, resp) = channel::<Task>();
922
923        let mut contract = rqst.try_request().unwrap();
924
925        match contract.try_receive() {
926            Err(TryReceiveError::Empty) => {},
927            _ => { assert!(false); },
928        }
929
930        assert_eq!(contract.done, false);
931
932        contract.done = true;
933    }
934    
935    #[test]
936    fn test_request_contract_try_receive_done() {
937        #[allow(unused_variables)]
938        let (rqst, resp) = channel::<Task>();
939
940        let mut contract = rqst.try_request().unwrap();
941
942        contract.done = true;
943
944        match contract.try_receive() {
945            Err(TryReceiveError::Done) => {},
946            _ => { assert!(false); },
947        }
948    }
949
950    #[test]
951    fn test_request_contract_try_cancel() {
952        #[allow(unused_variables)]
953        let (rqst, resp) = channel::<Task>();
954
955        let mut contract = rqst.try_request().unwrap();
956
957        assert_eq!(contract.try_cancel(), true);
958    }
959
960    #[test]
961    fn test_request_contract_try_cancel_too_late() {
962        #[allow(unused_variables)]
963        let (rqst, resp) = channel::<Task>();
964        
965        let mut contract = rqst.try_request().unwrap();
966
967        rqst.inner.try_unflag_request();
968
969        assert_eq!(contract.try_cancel(), false);
970        assert_eq!(contract.done, false);
971
972        contract.done = true;
973    }
974
975    #[test]
976    fn test_request_contract_try_cancel_done() {
977        #[allow(unused_variables)]
978        let (rqst, resp) = channel::<Task>();
979
980        let mut contract = rqst.try_request().unwrap();
981
982        contract.done = true;
983
984        assert_eq!(contract.try_cancel(), false);
985    }
986
987    #[test]
988    #[should_panic]
989    fn test_request_contract_drop_without_receiving_data() {
990        #[allow(unused_variables)]
991        let (rqst, resp) = channel::<Task>();
992
993        #[allow(unused_variables)]
994        let contract = rqst.try_request().unwrap();
995    }
996
997    #[test]
998    fn test_responder_try_respond() {
999        let (rqst, resp) = channel::<Task>();
1000        
1001        rqst.inner.flag_request();
1002
1003        let mut contract = resp.try_respond().unwrap();
1004
1005        contract.done = true;
1006    }
1007
1008    #[test]
1009    fn test_responder_try_respond_no_request() {
1010        #[allow(unused_variables)]
1011        let (rqst, resp) = channel::<Task>();
1012        
1013        match resp.try_respond() {
1014            Err(TryRespondError::NoRequest) => {},
1015            _ => { assert!(false); },
1016        }
1017    }
1018
1019    #[test]
1020    fn test_responder_try_respond_multiple() {
1021        #[allow(unused_variables)]
1022        let (rqst, resp) = channel::<Task>();
1023
1024        resp.inner.try_lock_response();
1025        
1026        match resp.try_respond() {
1027            Err(TryRespondError::Locked) => {},
1028            _ => { assert!(false); },
1029        }
1030    }
1031
1032    #[test]
1033    fn test_response_contract_send() {
1034        let (rqst, resp) = channel::<Task>();
1035
1036        rqst.inner.flag_request();
1037
1038        let contract = resp.try_respond().unwrap();
1039
1040        contract.send(Box::new(move || { println!("Hello World!"); }) as Task);
1041    }
1042
1043    #[test]
1044    #[should_panic]
1045    fn test_response_contract_drop_without_sending_data() {
1046        #[allow(unused_variables)]
1047        let (rqst, resp) = channel::<Task>();
1048
1049        #[allow(unused_variables)]
1050        let contract = resp.try_respond().unwrap();
1051    }
1052}