1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
use std::sync::mpsc;
use std::sync::mpsc::{Sender, Receiver};

// A message bus which buffers messages and sends them at the correct timestamp
// When a message is sent it is sent with a timestamp Message::<valuetype>::new(value, receive_time)
// The message will not be received by anything until bus.tick(time) where time >= receive_time given above
// When receive_time has passed the message will be sent to bus.output_receiver
pub struct Bus<T> {
    pub sender: Sender<Message<T>>,
    input_receiver: Receiver<Message<T>>,

    output_sender: Sender<T>,
    pub receiver: Receiver<T>,

    buffers: [Vec<Message<T>>; 32]
}

impl<T> Bus<T> {
    pub fn new() -> Bus<T> {
        //Input channel
        let (itx, irx) = mpsc::channel();

        //output channel
        let (otx, orx) = mpsc::channel();

        return Bus {
            sender: itx,
            input_receiver: irx,

            output_sender: otx,
            receiver: orx,

            //A set of buffers for messages in the future
            //Each buffer will be checked every 2^index ticks, messages are put into the farthest buffer possible
            //This messages messages will be shuffled log2(number of ticks into the future) times
            buffers: [
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
                Vec::<Message<T>>::new(),
            ]
        };
    }

    pub fn tick(&mut self, time : u64) {
        // Pump input messages, either into buffers or straight into the output
        loop {
            match self.input_receiver.try_recv() {
                Ok(v) => {
                    if v.time <= time {
                        self.output_sender.send(v.message).unwrap();;
                    } else {
                        self.buffers[buffer_index_for_time_offset(v.time - time)].push(v);
                    }
                },
                Err(_) => { break; }
            }
        }

        // Check buffers
        for buffer_index in 0 .. self.buffers.len() {
            if time % ((buffer_index + 1) as u64) == 0 {

                //Iterate through buffer, pumping appropriate messages
                let size = self.buffers[buffer_index].len();
                for j in 0 .. size {

                    //Index of the message in the buffer (we need to count backwards, since we're modifying the buffer as we step through it)
                    let msg_index = size - j - 1;

                    //timestamp of the message
                    let msg_time = self.buffers[buffer_index][msg_index].time;

                    //Either send this message, or move it to a lower buffer
                    if msg_time <= time {
                        self.output_sender.send(self.buffers[buffer_index].swap_remove(msg_index).message).unwrap();
                    } else {
                        //Calculate which buffer the message should be in, given the time offset
                        let move_to_buffer_index = buffer_index_for_time_offset(msg_time - time);
                        if move_to_buffer_index != buffer_index {
                            let msg = self.buffers[buffer_index].swap_remove(msg_index);
                            self.buffers[move_to_buffer_index].push(msg);
                        }
                    }
                }
            }
        }
    }
}

fn buffer_index_for_time_offset(offset : u64) -> usize {
    return (offset.next_power_of_two().trailing_zeros()) as usize;
}

pub struct Message<T> {
    message: T,
    time: u64
}

impl<T> Message<T> {
    pub fn new(message: T, time: u64) -> Message<T> {
        return Message {
            message: message,
            time: time
        }
    }
}

#[test]
fn check_buffer_index_for_times_is_calculated_correctly() {
    assert_eq!(buffer_index_for_time_offset(0) , 0);
    assert_eq!(buffer_index_for_time_offset(1) , 0);
    assert_eq!(buffer_index_for_time_offset(2) , 1);
    assert_eq!(buffer_index_for_time_offset(3) , 2);
    assert_eq!(buffer_index_for_time_offset(4) , 2);
    assert_eq!(buffer_index_for_time_offset(5) , 3);
    assert_eq!(buffer_index_for_time_offset(100) , 7);
    assert_eq!(buffer_index_for_time_offset(500) , 9);
    assert_eq!(buffer_index_for_time_offset(750) , 10);
    assert_eq!(buffer_index_for_time_offset(1000) , 10);
    assert_eq!(buffer_index_for_time_offset(1025) , 11);
}

#[test]
fn check_messages_are_returned_at_correct_time() {
    //Create a bus
    let mut bus = Bus::<u64>::new();

    //Send messages down bus, each message contains the time it should be received
    for i in 0 .. 1000 {
        bus.sender.send(Message::<u64>::new(i as u64, i as u64)).unwrap();
        bus.sender.send(Message::<u64>::new(i as u64, i as u64)).unwrap();
        bus.sender.send(Message::<u64>::new(i as u64, i as u64)).unwrap();
    }

    for i in 0 .. 1000 {
        //Tick, publishing received messages
        bus.tick(i);

        //Receive messages from the bus
        loop {
            match bus.receiver.try_recv() {
                Ok(v) => { assert_eq!(v, i); },
                Err(_) => { break; }
            }
        }
    }
}