1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
use std::cell::Cell;
use std::sync::{Arc,Mutex};

extern crate bus;
use bus::{Bus,BusReader};

#[cfg(test)]
mod tests;

/// Prerequisite trait for any struct to be the 'wrapped' value of a TcReader
pub trait TakesMessage<M> {
    fn take_message(&mut self, t: &M);
}

/// A set of connected `TcWriter<M>`s have any number (initially 0) of reading
/// TcWriter<T,M> objects for any types `T` (typically the same type T)
///
/// This is particularly useful when:
///     - The data has the set of writers W and readers R, where W != R
///     - The speed of reads is more vital than the speed of writes.
///       eg: Writes are very rare
///     - Its OK if read states are slightly stale 
///
/// It also has with it the nice properties of:
///     - Granular control of reader-synchronization events
///     - joining and leaving of writers whenever (using TcWriter::clone)
///     - joining and leaving of readers whenever (using TcWriter::add_reader)
///     - both blocking and non-blocking write options
///     - a reader can be unwrapped to return their T state.
/// 
/// The implementation allows readers to be initialized with not only different
/// local states, but even initial states of different types, (so long as they
/// all implement TakesMessage<M> for the same M as the writer(s)). It's not
/// clear to me how this option might be useful, but it costs nothing to
/// support, so why not.
///
/// TcWriter is implemented as a wrapper over a `bus::Bus`, with 'write'
/// messages being broadcast to all readers, and readers explicitly accepting
/// messages and applying them to their local T state.
///
/// See the tests for some commented examples
pub struct TcWriter<M>
where M: Sync + Clone {
    producer: Arc<Mutex<Bus<M>>>,
    // phantom: PhantomData<T>,
}

impl<M> TcWriter<M>
where M: Sync + Clone {

    /// Constructs a new `TcWriter<T,D>`.
    /// Facilitates mutation of the wrapped T object.
    #[inline]
    pub fn new(capacity: usize) -> Self {
        TcWriter { producer: Arc::new(Mutex::new(Bus::new(capacity))), }
    }

    /// Broadcasts the given D message to readers. Blocks until there is space
    /// on bus. has the same semantics as underlying `bus::Bus::broadcast`
    pub fn apply_change(&self, m: M) {
        self.producer.lock().unwrap().broadcast(m)
    }

    /// Broadcasts the given D message to readers, returns immediately if bus is
    /// full. Has the same semantics as underlying `bus::Bus::try_broadcast`
    pub fn try_apply_change(&self, m: M) -> Result<(), M> {
        if let Err(m) = self.producer.lock().unwrap().try_broadcast(m) {
            Err(m)
        } else {
            Ok(())
        }
    }

    /// Creates, registers and returns a new reader object to the underlying `T`
    /// The reader will maintain its own state
    pub fn add_reader<T: TakesMessage<M>>(&self, init: T) -> TcReader<T, M> {
        TcReader {
            data: Cell::new(init),
            consumer: self.producer.lock().unwrap().add_rx()
        }
    }
}

impl<M> Clone for TcWriter<M>
where M: Sync + Clone {
    fn clone(&self) -> Self {
        TcWriter { producer: self.producer.clone(), }
    }

    fn clone_from(&mut self, source: &Self) {
        self.producer = source.producer.clone();
    }
}


/// `TcReader<T,M>` maintains its local `T` object. The reader will receive and 
/// apply incoming `M` messages to its T whenever explicitly invoked by
/// `update`, `update_limited` or implicity by `get_mut_fresh`.
/// 
/// Access to the local copy is granted through the two `get_mut` variants.
/// Without any messages, this local access is very fast. The reader can also be
/// consumed to unwrap the local `T`.
/// 
/// The very explicit convention of using `stale` and `fresh` everywhere is to
/// make unintentionally forgetting a crucial update less likely.
pub struct TcReader<T,M>
where T: TakesMessage<M> {
    data: Cell<T>,
    consumer: BusReader<M>,
}

impl<T,M> TcReader<T,M>
where T: TakesMessage<M>,
      M: Sync + Clone {
    
    /// Receives all waiting messages and applies them to the local object.
    pub fn update(&mut self) -> usize {
        let mut count = 0;
        while let Ok(msg) = self.consumer.try_recv() {
            self.apply_given(&msg);
            count += 1
        }
        count
    }

    // Receives any waiting messages up to a limit. collects and returns these
    // messages in a vector.
    pub fn update_return(&mut self) -> Vec<M> {
        let mut v = vec![];
        while let Ok(msg) = self.consumer.try_recv() {
            self.apply_given(&msg);
            v.push(msg);
        }
        v
    }

    /// Receives any waiting messages up to a limit.
    /// Returns number of messages received and applied.
    pub fn update_limited(&mut self, limit: usize) -> usize {
        let mut count = 0;
        for _ in 0..limit {
            if let Ok(msg) = self.consumer.try_recv() {
                self.apply_given(&msg);
                count += 1;
            } else { break }
        }
        count
    }
    
    /// combination of `update_return` and `update_limited`
    pub fn update_return_limited(&mut self, limit: usize) -> Vec<M> {
        let mut v = vec![];
        for _ in 0..limit {
            if let Ok(msg) = self.consumer.try_recv() {
                self.apply_given(&msg);
                v.push(msg);
            } else { break }
        }
        v
    }

    /// Returns a mutable reference to current version of the trailing `T`.
    pub fn get_mut(&mut self) -> &mut T {
        self.data.get_mut()
    }

    /// Consumes the reader, returning the current version of the trailing `T`.
    pub fn into_inner(self) -> T {
        self.data.into_inner()
    }

    #[inline]
    fn apply_given(&mut self, msg: &M) {
        unsafe { (*self.data.as_ptr()).take_message(&msg); }
    }
}