1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use channel::Channel;
use command::Command;
use relay::Relay;
use scheduler::DEFAULT_SCHEDULER_PRECISION;
use scheduler::ScheduledEvent;
use std::collections::LinkedList;
use std::sync::mpsc::TryRecvError;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use types::BoxedEvent;


pub struct Dispatcher<'a, Context: 'a + Send> {
    event_channel: Channel<BoxedEvent<Context>>,
    command_channel: Channel<Command<Context>>,
    relay: Relay<Context>,
    context: &'a mut Context,

    running: bool,

    // Scheduling
    scheduled_events: LinkedList<ScheduledEvent<Context>>,
    scheduling_precision: Duration,
    scheduling_timer: Instant,
}

impl<'a, Context: 'a + Send> Dispatcher<'a, Context> {
    pub fn new(context: &mut Context) -> Dispatcher<Context> {
        let event_channel = Channel::<BoxedEvent<Context>>::new();
        let command_channel = Channel::<Command<Context>>::new();

        let relay = Relay::new(
            event_channel.get_sender().clone(),
            command_channel.get_sender().clone(),
        );

        Dispatcher {
            event_channel,
            command_channel,
            relay,
            context,
            running: false,
            scheduled_events: LinkedList::new(),
            scheduling_precision: Duration::from_millis(DEFAULT_SCHEDULER_PRECISION),
            scheduling_timer: Instant::now(),
        }
    }

    pub fn create_relay(&self) -> Relay<Context> {
        self.relay.clone()
    }

    pub fn set_scheduling_precision(&mut self, precision: Duration) { self.scheduling_precision = precision }

    pub fn run(&mut self) {
        self.running = true;

        while self.running {
            let elapsed = self.scheduling_timer.elapsed();

            if elapsed < self.scheduling_precision {
                thread::sleep(self.scheduling_precision - elapsed);
                continue;
            } else {
                self.scheduling_timer = Instant::now();
            }

            while self.has_remaining_loop_time() && self.handle_commands() {}
            self.handle_scheduled_events(elapsed);
            while self.has_remaining_loop_time() && self.handle_events() {}
        }
    }

    #[inline]
    pub fn has_remaining_loop_time(&self) -> bool {
        self.scheduling_timer.elapsed() < self.scheduling_precision
    }

    #[inline]
    fn handle_commands(&mut self) -> bool {
        match self.command_channel.get_receiver().try_recv() {
            Ok(command) => {
                self.process_command(command);
                return true;
            }
            Err(err) => {
                match err {
                    TryRecvError::Empty => (),
                    TryRecvError::Disconnected => {
                        self.running = false;
                        println!("Channel disconnected")
                    }
                };
                return false;
            }
        }
    }

    #[inline]
    fn process_command(&mut self, command: Command<Context>) {
        match command {
            Command::Stop => self.running = false,
            Command::Schedule(event, delay) => self.scheduled_events.push_back(ScheduledEvent::new(event, delay)),
        }
    }

    #[inline]
    fn handle_scheduled_events(&mut self, elapsed: Duration) {
        let sender = self.event_channel.get_sender();

        self.scheduled_events.drain_filter(
            |event| -> bool {
                event.advance(elapsed);
                event.is_ready()
            }
        ).for_each(
            |event|
                sender.send(event.get_event()).unwrap()
        );
    }

    #[inline]
    fn handle_events(&mut self) -> bool {
        match self.event_channel.get_receiver().try_recv() {
            Ok(event) => {
                event.process(&mut self.relay, &mut self.context);
                return true;
            }
            Err(err) => {
                match err {
                    TryRecvError::Empty => (),
                    TryRecvError::Disconnected => {
                        self.running = false;
                        println!("Channel disconnected");
                    }
                };
                return false;
            }
        }
    }
}