trailing_cell/lib.rs
1use std::sync::{Arc,Mutex};
2
3extern crate bus;
4use bus::{Bus,BusReader};
5
6#[cfg(test)]
7mod tests;
8
9/// Prerequisite trait for any struct to be the 'wrapped' value of a TcReader
10pub trait TakesMessage<M> {
11 fn take_message(&mut self, t: &M);
12}
13
14/// A set of connected `TcWriter<M>`s have any number (initially 0) of reading
15/// TcWriter<T,M> objects for any types `T` (typically the same type T).
16/// The data within some reader `r` can be then accessed by dereferencing `*r`.
17///
18/// This is particularly useful when:
19/// - The data has the set of writers W and readers R, where W != R
20/// - The speed of reads is more vital than the speed of writes.
21/// eg: Writes are very rare
22/// - Its OK if read states are slightly stale
23///
24/// It also has with it the nice properties of:
25/// - Granular control of reader-synchronization events
26/// - joining and leaving of writers whenever (using TcWriter::clone)
27/// - joining and leaving of readers whenever (using TcWriter::add_reader)
28/// - both blocking and non-blocking write options
29/// - a reader can be unwrapped to return their T state.
30///
31///
32/// The implementation allows readers to be initialized with not only different
33/// local states, but even initial states of different types, (so long as they
34/// all implement TakesMessage<M> for the same M as the writer(s)). It's not
35/// clear to me how this option might be useful, but it costs nothing to
36/// support, so why not.
37///
38/// TcWriter is implemented as a wrapper over a `bus::Bus`, with 'write'
39/// messages being broadcast to all readers, and readers explicitly accepting
40/// messages and applying them to their local T state.
41///
42/// See the tests for some commented examples
43pub struct TcWriter<M>
44where M: Sync + Clone {
45 producer: Arc<Mutex<Bus<M>>>,
46}
47
48impl<M> TcWriter<M>
49where M: Sync + Clone {
50
51 /// Constructs a new `TcWriter<T,D>`.
52 /// Facilitates mutation of the wrapped T object.
53 #[inline]
54 pub fn new(capacity: usize) -> Self {
55 TcWriter { producer: Arc::new(Mutex::new(Bus::new(capacity))), }
56 }
57
58 /// Broadcasts the given D message to readers. Blocks until there is space
59 /// on bus. has the same semantics as underlying `bus::Bus::broadcast`
60 pub fn apply_change(&self, m: M) {
61 self.producer.lock().unwrap().broadcast(m)
62 }
63
64 /// Broadcasts the given D message to readers, returns immediately if bus is
65 /// full. Has the same semantics as underlying `bus::Bus::try_broadcast`
66 pub fn try_apply_change(&self, m: M) -> Result<(), M> {
67 if let Err(m) = self.producer.lock().unwrap().try_broadcast(m) {
68 Err(m)
69 } else {
70 Ok(())
71 }
72 }
73
74 /// Creates, registers and returns a new reader object to the underlying `T`
75 /// The reader will maintain its own state
76 pub fn add_reader<T: TakesMessage<M>>(&self, init: T) -> TcReader<T, M> {
77 TcReader {
78 data: init,
79 consumer: self.producer.lock().unwrap().add_rx()
80 }
81 }
82}
83
84impl<M> Clone for TcWriter<M>
85where M: Sync + Clone {
86 fn clone(&self) -> Self {
87 TcWriter { producer: self.producer.clone(), }
88 }
89
90 fn clone_from(&mut self, source: &Self) {
91 self.producer = source.producer.clone();
92 }
93}
94
95
96/// `TcReader<T,M>` maintains its local `T` object. The reader will receive and
97/// apply incoming `M` messages to its T whenever explicitly invoked by
98/// `update`, `update_limited` or implicity by `get_mut_fresh`.
99///
100/// Access to the local copy is granted through the two `get_mut` variants.
101/// Without any messages, this local access is very fast. The reader can also be
102/// consumed to unwrap the local `T`.
103///
104/// The very explicit convention of using `stale` and `fresh` everywhere is to
105/// make unintentionally forgetting a crucial update less likely.
106pub struct TcReader<T,M>
107where T: TakesMessage<M> {
108 data: T,
109 consumer: BusReader<M>,
110}
111
112impl<T,M> TcReader<T,M>
113where T: TakesMessage<M>,
114 M: Sync + Clone {
115
116 /// Receives all waiting messages and applies them to the local object.
117 pub fn update(&mut self) -> usize {
118 let mut count = 0;
119 while let Ok(msg) = self.consumer.try_recv() {
120 self.apply_given(&msg);
121 count += 1
122 }
123 count
124 }
125
126 /// Receives any waiting messages up to a limit. collects and returns these
127 /// messages in a vector.
128 pub fn update_return(&mut self) -> Vec<M> {
129 let mut v = vec![];
130 while let Ok(msg) = self.consumer.try_recv() {
131 self.apply_given(&msg);
132 v.push(msg);
133 }
134 v
135 }
136
137 /// Receives any waiting messages up to a limit.
138 /// Returns number of messages received and applied.
139 pub fn update_limited(&mut self, limit: usize) -> usize {
140 let mut count = 0;
141 for _ in 0..limit {
142 if let Ok(msg) = self.consumer.try_recv() {
143 self.apply_given(&msg);
144 count += 1;
145 } else { break }
146 }
147 count
148 }
149
150 /// combination of `update_return` and `update_limited`
151 pub fn update_return_limited(&mut self, limit: usize) -> Vec<M> {
152 let mut v = vec![];
153 for _ in 0..limit {
154 if let Ok(msg) = self.consumer.try_recv() {
155 self.apply_given(&msg);
156 v.push(msg);
157 } else { break }
158 }
159 v
160 }
161
162 /// Consumes the reader, returning the current version of the trailing `T`.
163 pub fn into_inner(self) -> T {
164 self.data
165 }
166
167 #[inline]
168 fn apply_given(&mut self, msg: &M) {
169 self.data.take_message(&msg);
170 }
171}
172
173impl<T,M> std::ops::Deref for TcReader<T,M>
174where T: TakesMessage<M> {
175 type Target = T;
176
177 fn deref(&self) -> &T {
178 &self.data
179 }
180}
181
182impl<T,M> std::ops::DerefMut for TcReader<T,M>
183where T: TakesMessage<M> {
184 fn deref_mut(&mut self) -> &mut T {
185 &mut self.data
186 }
187}
188