transmitter/
lib.rs

1use std::fmt::{self,Debug};
2use std::sync::{Arc,Condvar,Mutex};
3
4struct InnerOne<T> {
5    payload: Mutex<Option<T>>,
6    cond: Condvar,
7}
8impl<T> InnerOne<T> {
9    fn new() -> InnerOne<T> {
10        InnerOne {
11            payload: Mutex::new(None),
12            cond: Condvar::new(),
13        }
14    }
15    fn set(&self, t: T) {
16        let mut lock = self.payload.lock().unwrap();
17        *lock = Some(t);
18        self.cond.notify_one();
19    }
20    fn wait(&self) -> T {
21        let mut lock = self.payload.lock().unwrap();
22        while lock.is_none() {
23            lock = self.cond.wait(lock).unwrap();
24        }
25        lock.take().unwrap()
26    }
27}
28
29pub struct OneGet<T>(Arc<InnerOne<Option<T>>>);
30impl<T> OneGet<T> {
31    pub fn is_ready(&self) -> bool {
32        // relaxed variant
33        Arc::strong_count(&self.0) == 1
34    }
35    pub fn wait(self) -> Option<T> {
36        self.0.wait()
37    }
38    pub fn try_get(self) -> Result<Option<T>,OneGet<T>> {
39        match Arc::try_unwrap(self.0) {
40            Ok(inner) => Ok(inner.wait()),
41            Err(arc_inner) => Err(OneGet(arc_inner)),
42        }
43    }
44}
45impl<T> Debug for OneGet<T> {
46    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47        write!(f, "OneGet")
48    }
49}
50
51pub struct OneSet<T>(Arc<InnerOne<Option<T>>>,bool);
52impl<T> OneSet<T> {
53    pub fn is_needed(&self) -> bool {
54        // relaxed variant
55        Arc::strong_count(&self.0) == 2
56    }
57    pub fn set(mut self, t: T) {
58        self.0.set(Some(t));
59        self.1 = true;
60    }
61}
62impl<T> Debug for OneSet<T> {
63    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64        write!(f, "OneSet")
65    }
66}   
67impl<T> Drop for OneSet<T> {
68    fn drop(&mut self) {
69        if !self.1 {
70            self.0.set(None);
71        }
72    }
73}
74
75pub fn oneshot<T>() -> (OneSet<T>,OneGet<T>) {
76    let r = Arc::new(InnerOne::new());
77    (OneSet(r.clone(),false),OneGet(r))
78}
79
80
81#[cfg(test)]
82mod tests {
83    use super::oneshot;
84    use std::thread;
85    use std::time::Duration;
86    
87    #[test]
88    fn test_wait_setting() {
89        let (tx,rx) = oneshot();
90        let h = thread::spawn(move || {
91            thread::sleep(Duration::from_millis(500));
92            tx.set(5);
93        });
94        assert_eq!(rx.wait(),Some(5));
95        h.join().unwrap();
96    }
97    
98    #[test]
99    fn test_wait_getting() {
100        let (tx,rx) = oneshot();
101        let h = thread::spawn(move || {                
102            tx.set(3);
103        });
104        thread::sleep(Duration::from_millis(500));
105        assert_eq!(rx.wait(),Some(3));
106        h.join().unwrap();
107    }
108    
109    #[test]
110    fn test_drop_setter() {
111        let (tx,rx) = oneshot::<u64>();
112        let h = thread::spawn(move || {                
113            let _tx = tx;
114        });
115        thread::sleep(Duration::from_millis(500));
116        assert_eq!(rx.wait(),None);
117        h.join().unwrap();
118    }
119    
120}
121