1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

type AtomicFlag = AtomicUsize;

/// Struct to force the alignment of the stored data match the typical size of a cache-line
/// to avoid false sharing.
#[repr(align(64))]
struct AlignedData<T>(T);

/// Triple buffer that uses atomic operations to rotate the 3 buffers during consume/produce operations
struct TripleBuffer<T> {
    buffers: UnsafeCell<[AlignedData<T>; 3]>,

    // flag bits:
    // newWrite   = (flags & 0x40)
    // produceIndex = (flags & 0x30) >> 4       buffer to be produced, write to
    // intermediateIndex = (flags & 0xC) >> 2   intermediate buffer (transit zone)
    // consumeIndex  = (flags & 0x3)            buffer to consume, consume from
    flags: AtomicFlag,
}

unsafe impl<T> Sync for TripleBuffer<T> {}

impl<T: Default> TripleBuffer<T> {
    pub fn new() -> TripleBuffer<T> {
        TripleBuffer {
            buffers: UnsafeCell::new([
                AlignedData(Default::default()),
                AlignedData(Default::default()),
                AlignedData(Default::default()),
            ]),
            flags: AtomicFlag::new(0x6),
        }
    }
}

impl<T> TripleBuffer<T> {
    /// Gets the index of the buffer to produce
    fn get_produce_index(&self) -> usize {
        (self.flags.load(Ordering::SeqCst) & 0x30) >> 4
    }

    /// Swaps consume and intermediate buffers and resets the new flag.
    /// If a the new flag was set, the index to the (new) consume buffer is returned, otherwise Err
    /// is returned.
    /// Index of the produce buffer is not modified.
    fn try_get_consume_index(&self) -> Result<usize, ()> {
        let mut old_flags = self.flags.load(Ordering::Acquire);
        let mut new_flags: usize;
        loop {
            if (old_flags & 0x40) == 0 {
                // nothing new, no need to swap
                return Err(());
            }
            // clear the "new" bit and swap the indices of consume and intermediate buffers
            new_flags = (old_flags & 0x30) | ((old_flags & 0x3) << 2) | ((old_flags & 0xC) >> 2);

            match self
                .flags
                .compare_exchange(old_flags, new_flags, Ordering::SeqCst, Ordering::Relaxed)
            {
                Ok(_) => break,
                Err(x) => old_flags = x,
            }
        }
        Ok(new_flags & 0x3)
    }

    /// Swaps intermediate and (new)produced buffers and sets the new flag.
    /// Index of the consume buffer is not modified.
    fn set_produce(&self) {
        let mut old_flags = self.flags.load(Ordering::Acquire);
        loop {
            // set the "new" bit and swap the indices of produce and intermediate buffers
            let new_flags = 0x40 | ((old_flags & 0xC) << 2) | ((old_flags & 0x30) >> 2) | (old_flags & 0x3);

            match self
                .flags
                .compare_exchange(old_flags, new_flags, Ordering::SeqCst, Ordering::Relaxed)
            {
                Ok(_) => break,
                Err(x) => old_flags = x,
            }
        }
    }
}

/// Sender part of the communication.
pub struct Sender<T>(Arc<TripleBuffer<T>>);

// The receiver can be sent from place to place, so long as it
// is not used to receive non-sendable things.
unsafe impl<T: Send> Send for Sender<T> {}

//impl<T> !Sync for Sender<T> { }

impl<T> Sender<T> {
    fn new(owner: &Arc<TripleBuffer<T>>) -> Sender<T> {
        Sender(owner.clone())
    }

    pub fn send_buffer(&self) -> Result<RefSendBuffer<T>, ()> {
        Ok(RefSendBuffer(&self.0, self.0.get_produce_index()))
    }
}

impl<T: Copy> Sender<T> {
    pub fn send(&self, value: T) -> Result<(), ()> {
        match self.send_buffer() {
            Ok(mut b) => {
                *b = value;
                Ok(())
            }
            Err(_) => Err(()),
        }
    }
}

/// Reference to the buffer held by the producer
pub struct RefSendBuffer<'a, T: 'a>(&'a TripleBuffer<T>, usize);

impl<'a, T> Drop for RefSendBuffer<'a, T> {
    fn drop(&mut self) {
        self.0.set_produce();
    }
}

impl<'a, T> Deref for RefSendBuffer<'a, T> {
    type Target = T;
    fn deref(&self) -> &T {
        unsafe { &(*self.0.buffers.get())[self.1].0 }
    }
}

impl<'a, T> DerefMut for RefSendBuffer<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut (*self.0.buffers.get())[self.1].0 }
    }
}

/// Receiver part of the communication
pub struct Receiver<T>(Arc<TripleBuffer<T>>);

// The consumer can be sent from place to place, so long as it
// is not used to receive non-sendable things.
unsafe impl<T: Send> Send for Receiver<T> {}

//impl<T> !Sync for Receiver<T> { }

impl<T> Receiver<T> {
    fn new(owner: &Arc<TripleBuffer<T>>) -> Receiver<T> {
        Receiver(owner.clone())
    }

    pub fn receive_buffer(&self) -> Result<RefReceiveBuffer<T>, ()> {
        match self.0.try_get_consume_index() {
            Ok(idx) => Ok(RefReceiveBuffer(&self.0, idx)),
            Err(_) => Err(()),
        }
    }
}

impl<T: Copy> Receiver<T> {
    pub fn receive(&self) -> Result<T, ()> {
        match self.receive_buffer() {
            Ok(b) => Ok(*b),
            Err(_) => Err(()),
        }
    }
}

/// Reference to the buffer held by the consumer
pub struct RefReceiveBuffer<'a, T: 'a>(&'a TripleBuffer<T>, usize);

impl<'a, T> Deref for RefReceiveBuffer<'a, T> {
    type Target = T;
    fn deref(&self) -> &T {
        unsafe { &(*self.0.buffers.get())[self.1].0 }
    }
}

impl<'a, T> DerefMut for RefReceiveBuffer<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut (*self.0.buffers.get())[self.1].0 }
    }
}

/// Create a Sender/Receiver with an embedded shared buffer for communication.
/// It is not a "Single Producer Single Consumer" queue as some massages can be dropped based
/// on thread scheduling.
pub fn state_channel<T: Default>() -> (Sender<T>, Receiver<T>) {
    let a = Arc::new(TripleBuffer::new());
    (Sender::new(&a), Receiver::new(&a))
}