trailing_cell/
lib.rs

1use std::sync::{Arc,Mutex};
2
3extern crate bus;
4use bus::{Bus,BusReader};
5
6#[cfg(test)]
7mod tests;
8
9/// Prerequisite trait for any struct to be the 'wrapped' value of a TcReader
10pub trait TakesMessage<M> {
11    fn take_message(&mut self, t: &M);
12}
13
14/// A set of connected `TcWriter<M>`s have any number (initially 0) of reading
15/// TcWriter<T,M> objects for any types `T` (typically the same type T).
16/// The data within some reader `r` can be then accessed by dereferencing `*r`.
17///
18/// This is particularly useful when:
19///     - The data has the set of writers W and readers R, where W != R
20///     - The speed of reads is more vital than the speed of writes.
21///       eg: Writes are very rare
22///     - Its OK if read states are slightly stale 
23///
24/// It also has with it the nice properties of:
25///     - Granular control of reader-synchronization events
26///     - joining and leaving of writers whenever (using TcWriter::clone)
27///     - joining and leaving of readers whenever (using TcWriter::add_reader)
28///     - both blocking and non-blocking write options
29///     - a reader can be unwrapped to return their T state.
30/// 
31///
32/// The implementation allows readers to be initialized with not only different
33/// local states, but even initial states of different types, (so long as they
34/// all implement TakesMessage<M> for the same M as the writer(s)). It's not
35/// clear to me how this option might be useful, but it costs nothing to
36/// support, so why not.
37///
38/// TcWriter is implemented as a wrapper over a `bus::Bus`, with 'write'
39/// messages being broadcast to all readers, and readers explicitly accepting
40/// messages and applying them to their local T state.
41///
42/// See the tests for some commented examples
43pub struct TcWriter<M>
44where M: Sync + Clone {
45    producer: Arc<Mutex<Bus<M>>>,
46}
47
48impl<M> TcWriter<M>
49where M: Sync + Clone {
50
51    /// Constructs a new `TcWriter<T,D>`.
52    /// Facilitates mutation of the wrapped T object.
53    #[inline]
54    pub fn new(capacity: usize) -> Self {
55        TcWriter { producer: Arc::new(Mutex::new(Bus::new(capacity))), }
56    }
57
58    /// Broadcasts the given D message to readers. Blocks until there is space
59    /// on bus. has the same semantics as underlying `bus::Bus::broadcast`
60    pub fn apply_change(&self, m: M) {
61        self.producer.lock().unwrap().broadcast(m)
62    }
63
64    /// Broadcasts the given D message to readers, returns immediately if bus is
65    /// full. Has the same semantics as underlying `bus::Bus::try_broadcast`
66    pub fn try_apply_change(&self, m: M) -> Result<(), M> {
67        if let Err(m) = self.producer.lock().unwrap().try_broadcast(m) {
68            Err(m)
69        } else {
70            Ok(())
71        }
72    }
73
74    /// Creates, registers and returns a new reader object to the underlying `T`
75    /// The reader will maintain its own state
76    pub fn add_reader<T: TakesMessage<M>>(&self, init: T) -> TcReader<T, M> {
77        TcReader {
78            data: init,
79            consumer: self.producer.lock().unwrap().add_rx()
80        }
81    }
82}
83
84impl<M> Clone for TcWriter<M>
85where M: Sync + Clone {
86    fn clone(&self) -> Self {
87        TcWriter { producer: self.producer.clone(), }
88    }
89
90    fn clone_from(&mut self, source: &Self) {
91        self.producer = source.producer.clone();
92    }
93}
94
95
96/// `TcReader<T,M>` maintains its local `T` object. The reader will receive and 
97/// apply incoming `M` messages to its T whenever explicitly invoked by
98/// `update`, `update_limited` or implicity by `get_mut_fresh`.
99/// 
100/// Access to the local copy is granted through the two `get_mut` variants.
101/// Without any messages, this local access is very fast. The reader can also be
102/// consumed to unwrap the local `T`.
103/// 
104/// The very explicit convention of using `stale` and `fresh` everywhere is to
105/// make unintentionally forgetting a crucial update less likely.
106pub struct TcReader<T,M>
107where T: TakesMessage<M> {
108    data: T,
109    consumer: BusReader<M>,
110}
111
112impl<T,M> TcReader<T,M>
113where T: TakesMessage<M>,
114      M: Sync + Clone {
115    
116    /// Receives all waiting messages and applies them to the local object.
117    pub fn update(&mut self) -> usize {
118        let mut count = 0;
119        while let Ok(msg) = self.consumer.try_recv() {
120            self.apply_given(&msg);
121            count += 1
122        }
123        count
124    }
125
126    /// Receives any waiting messages up to a limit. collects and returns these
127    /// messages in a vector.
128    pub fn update_return(&mut self) -> Vec<M> {
129        let mut v = vec![];
130        while let Ok(msg) = self.consumer.try_recv() {
131            self.apply_given(&msg);
132            v.push(msg);
133        }
134        v
135    }
136
137    /// Receives any waiting messages up to a limit.
138    /// Returns number of messages received and applied.
139    pub fn update_limited(&mut self, limit: usize) -> usize {
140        let mut count = 0;
141        for _ in 0..limit {
142            if let Ok(msg) = self.consumer.try_recv() {
143                self.apply_given(&msg);
144                count += 1;
145            } else { break }
146        }
147        count
148    }
149    
150    /// combination of `update_return` and `update_limited`
151    pub fn update_return_limited(&mut self, limit: usize) -> Vec<M> {
152        let mut v = vec![];
153        for _ in 0..limit {
154            if let Ok(msg) = self.consumer.try_recv() {
155                self.apply_given(&msg);
156                v.push(msg);
157            } else { break }
158        }
159        v
160    }
161
162    /// Consumes the reader, returning the current version of the trailing `T`.
163    pub fn into_inner(self) -> T {
164        self.data
165    }
166
167    #[inline]
168    fn apply_given(&mut self, msg: &M) {
169        self.data.take_message(&msg);
170    }
171}
172
173impl<T,M> std::ops::Deref for TcReader<T,M>
174where T: TakesMessage<M> {
175    type Target = T;
176
177    fn deref(&self) -> &T {
178        &self.data
179    }
180}
181
182impl<T,M> std::ops::DerefMut for TcReader<T,M>
183where T: TakesMessage<M> {
184    fn deref_mut(&mut self) -> &mut T {
185        &mut self.data
186    }
187}
188