poll_channel/
lib.rs

1//!# Rust: Poll on channel
2//!
3//!`poll-channel` provides a way to poll on channel in Rust sync programming, crossbeam channel was used in this crate.
4//!
5//!example
6//!```rust
7//!use poll_channel::{channel, Poll};
8//!
9//!#[test]
10//!fn poll_test() -> Result<(), crossbeam::channel::RecvError> {
11//!    let (tx1, rx1) = channel();
12//!    let (tx2, rx2) = channel();
13//!
14//!    let poller = Poll::new();
15//!    poller.append([&rx1, &rx2]);
16//!
17//!    let _ = tx1.send(100);
18//!    let _ = tx2.send(200);
19//!    let mut i = 0;
20//!
21//!    while i < 3 {
22//!        let id = poller.poll(0.01);
23//!        if id == rx1.id() {
24//!            let n1 = rx1.recv()?;
25//!            assert!(n1 == 100);
26//!            i += 1;
27//!        } else if id == rx2.id() {
28//!            let n2 = rx2.recv()?;
29//!            assert!(n2 == 200);
30//!            i += 1;
31//!        } else if id == -1 {
32//!            // timeout
33//!            i += 1;
34//!            break;
35//!        }
36//!    }
37//!
38//!    Ok(())
39//!}
40//!```
41use std::{
42    sync::{Arc, Mutex},
43    time::Duration,
44};
45
46pub use crossbeam::channel::RecvError;
47pub use crossbeam::channel::RecvTimeoutError;
48pub use crossbeam::channel::SendError;
49pub use crossbeam::channel::TryRecvError;
50
51pub struct Signal {
52    tx: crossbeam::channel::Sender<i32>,
53    rx: crossbeam::channel::Receiver<i32>,
54}
55
56impl Signal {
57    fn new() -> Self {
58        let (tx, rx) = crossbeam::channel::unbounded();
59        Self { tx, rx }
60    }
61}
62
63pub struct Sender<T> {
64    init: Mutex<bool>,
65    producer: Mutex<Option<SignalSender>>,
66    signal: ArcMutex2<OptionSignal>,
67    tx: crossbeam::channel::Sender<T>,
68    id: i32,
69}
70
71pub type SignalSender = crossbeam::channel::Sender<i32>;
72pub type OptionSignal = Option<Signal>;
73pub type ArcMutex<T> = Arc<Mutex<T>>;
74pub type ArcMutex2<T> = ArcMutex<ArcMutex<T>>;
75static UID: Mutex<i32> = Mutex::new(0);
76
77pub struct Receiver<T> {
78    signal: ArcMutex2<OptionSignal>,
79    rx: crossbeam::channel::Receiver<T>,
80    id: i32,
81}
82
83pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
84    let inner = Arc::new(Mutex::new(None));
85    let signal = Arc::new(Mutex::new(inner));
86    let (tx, rx) = crossbeam::channel::unbounded();
87    let mut id = UID.lock().unwrap();
88    let next = *id;
89    *id += 1;
90    let receiver = Receiver {
91        signal,
92        rx,
93        id: next,
94    };
95    let sender = Sender {
96        producer: Mutex::new(None),
97        signal: receiver.signal.clone(),
98        tx,
99        id: next,
100        init: Mutex::new(false),
101    };
102    (sender, receiver)
103}
104
105impl<T> Clone for Sender<T> {
106    fn clone(&self) -> Self {
107        Self {
108            init: Mutex::new(false),
109            producer: Mutex::new(None),
110            signal: self.signal.clone(),
111            tx: self.tx.clone(),
112            id: self.id,
113        }
114    }
115}
116
117impl<T> Sender<T> {
118    pub fn send(&self, data: T) -> Result<(), SendError<T>> {
119        // avoid mutable, no one races for the mutexes
120        let mut init = self.init.lock().unwrap();
121        let mut producer = self.producer.lock().unwrap();
122        if !*init {
123            *init = true;
124            let inner = self.signal.lock().unwrap();
125            let signal = inner.lock().unwrap();
126            if signal.is_some() {
127                let tx = signal.as_ref().unwrap().tx.clone();
128                *producer = Some(tx);
129            }
130        }
131        let result = self.tx.send(data);
132        if let Some(signal) = &*producer {
133            let _ = signal.send(self.id);
134        }
135        return result;
136    }
137}
138
139impl<T> Receiver<T> {
140    /// channel id
141    pub fn id(&self) -> i32 {
142        self.id
143    }
144
145    pub fn recv(&self) -> Result<T, RecvError> {
146        self.rx.recv()
147    }
148
149    pub fn recv_timeout(
150        &self,
151        timeout: Duration,
152    ) -> Result<T, crossbeam::channel::RecvTimeoutError> {
153        self.rx.recv_timeout(timeout)
154    }
155
156    pub fn try_recv(&self) -> Result<T, TryRecvError> {
157        self.rx.try_recv()
158    }
159
160    pub fn len(&self) -> usize {
161        self.rx.len()
162    }
163}
164
165pub trait Pollable {
166    /// shared signal channel
167    fn signal(&self) -> ArcMutex2<OptionSignal>;
168    /// channel id
169    fn id(&self) -> i32;
170}
171
172impl<T> Pollable for Receiver<T> {
173    fn signal(&self) -> ArcMutex2<OptionSignal> {
174        self.signal.clone()
175    }
176
177    fn id(&self) -> i32 {
178        self.id
179    }
180}
181
182pub struct Poll {
183    signal: ArcMutex<OptionSignal>,
184}
185
186impl Poll {
187    pub fn new() -> Self {
188        let instance = Signal::new();
189        let inner = Arc::new(Mutex::new(Some(instance)));
190        Self { signal: inner }
191    }
192
193    /// Append list of receivers
194    pub fn append<T: Pollable>(&self, receivers: &[&T]) {
195        for i in receivers {
196            self.add(*i);
197        }
198    }
199
200    /// Add single receiver
201    pub fn add<T: Pollable>(&self, receiver: &T) {
202        let outer = receiver.signal();
203        let mut inner = outer.lock().unwrap();
204        *inner = self.signal.clone();
205    }
206
207    /// Poll with decimal seconds timeout, return channel id, -1 for timeout.
208    pub fn poll(&self, timeout: f32) -> i32 {
209        let timeout = Duration::from_nanos((timeout * 1e9) as u64);
210        // single reader
211        let signal = self.signal.lock().unwrap();
212        signal
213            .as_ref()
214            .unwrap()
215            .rx
216            .recv_timeout(timeout)
217            .unwrap_or(-1)
218    }
219}