1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
use self::traits::*;
use super::*;

use crossbeam::channel::RecvTimeoutError;

type MachineMap = super_slab::SuperSlab<ShareableMachine>;

// The scheduler is responsible for the life-cycle of a machine.
//
// It starts with a machine being built and it being assigned.
// When it receives messages, the machine is given to the executor
// as a task, by the sender. If the machine SendBlocks the
// executor send it back to the scheduler to be re-scheduled.
// when disconnected, the executor sends it back to the sched
// to be destroyed.
//
// Some thing of note:
// * Crossbeam Deque is the task queue
// * SuperSlab is used as a container of machines.
//

// Tuning for the scheduler, the count if for slab and map index sizing.
#[allow(dead_code)]
#[allow(non_upper_case_globals)]
/// The machine_count_estimate is an estimate for the number of machines
/// that will exist at any point in time. A SuperSlab is used for tracking
/// machines and mis-estimating will cause allocation.
/// The default is 5000 machines.
pub static machine_count_estimate: AtomicCell<usize> = AtomicCell::new(5000);

/// The get_machine_count_estimate function returns the current estimate.
#[allow(dead_code)]
pub fn get_machine_count_estimate() -> usize { machine_count_estimate.load() }

/// The set_machine_count_estimate function sets the current estimate. The
/// estimate should be set before starting the server.
#[allow(dead_code)]
pub fn set_machine_count_estimate(new: usize) { machine_count_estimate.store(new); }

/// The selector_maintenance_duration determines how often the selector will yield
/// for maintanance. It will also yeild when it has accumulated enough debt to warrant yielding.
#[allow(dead_code, non_upper_case_globals)]
#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
pub static selector_maintenance_duration: AtomicCell<Duration> = AtomicCell::new(Duration::from_millis(500));

/// The get_selector_maintenance_duration function returns the current maintenance duration.
#[allow(dead_code, non_upper_case_globals, deprecated)]
#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
pub fn get_selector_maintenance_duration() -> Duration { selector_maintenance_duration.load() }

/// The set_selector_maintenance_duration function sets the current maintenance duration.
#[allow(dead_code, non_upper_case_globals, deprecated)]
#[deprecated(since = "0.1.2", note = "select is no longer used by the scheduler")]
pub fn set_selector_maintenance_duration(new: Duration) { selector_maintenance_duration.store(new); }

/// The live_machine_count is the number of machines in the collective.
#[allow(dead_code, non_upper_case_globals)]
pub static live_machine_count: AtomicUsize = AtomicUsize::new(0);

/// The get_machine_count function returns the number of machines in the collective.
#[allow(dead_code, non_upper_case_globals)]
pub fn get_machine_count() -> usize { live_machine_count.load(Ordering::SeqCst) }

/// Statistics for the schdeduler
#[derive(Debug, Default, Copy, Clone)]
pub struct SchedStats {
    pub maint_time: Duration,
    pub add_time: Duration,
    pub remove_time: Duration,
    pub total_time: Duration,
}

// The default scheduler. It is created by the scheduler factory.
#[allow(dead_code)]
pub struct DefaultScheduler {
    sender: SchedSender,
    wait_queue: SchedTaskInjector,
    thread: Option<thread::JoinHandle<()>>,
}
impl DefaultScheduler {
    // stop the scheduler
    fn stop(&self) {
        log::info!("stopping scheduler");
        self.sender.send(SchedCmd::Stop).unwrap();
    }
    // create the scheduler
    pub fn new(sender: SchedSender, receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Self {
        live_machine_count.store(0, Ordering::SeqCst);
        let wait_queue = Arc::clone(&queues.1);
        let thread = SchedulerThread::spawn(receiver, monitor, queues);
        sender.send(SchedCmd::Start).unwrap();
        Self {
            wait_queue,
            sender,
            thread,
        }
    }
}

impl Scheduler for DefaultScheduler {
    // assign a new machine into the collective
    fn assign_machine(&self, machine: ShareableMachine) { self.sender.send(SchedCmd::New(machine)).unwrap(); }
    // request stats from the executors
    fn request_stats(&self) { self.sender.send(SchedCmd::RequestStats).unwrap(); }
    // request machine info
    fn request_machine_info(&self) { self.sender.send(SchedCmd::RequestMachineInfo).unwrap(); }
    // stop the scheduler
    fn stop(&self) { self.stop(); }
}

// If we haven't done so already, attempt to stop the schduler thread
impl Drop for DefaultScheduler {
    fn drop(&mut self) {
        if let Some(thread) = self.thread.take() {
            if self.sender.send(SchedCmd::Terminate(false)).is_err() {}
            log::info!("synchronizing Scheduler shutdown");
            if thread.join().is_err() {
                log::trace!("failed to join Scheduler thread");
            }
        }
        log::info!("Scheduler shutdown complete");
    }
}

// The schduler thread. Working through the borrow-checker made
// this an interesting design. At the top we have maintenance
// of the collective, where machines are inserted or removed.
// From there a select list is created for every machine ready
// to receive a command. That layer is is responsible for deciding
// which commands it can immediately handle and which need to
// be handled by the outer layer. Then we come to the final
// layer of the scheduler. Where it mantians a seconday select
// list for machines returing from the executor.
//
const MAX_SELECT_HANDLES: usize = usize::MAX - 16;

#[allow(dead_code)]
struct SchedulerThread {
    receiver: SchedReceiver,
    monitor: MonitorSender,
    wait_queue: SchedTaskInjector,
    run_queue: TaskInjector,
    is_running: bool,
    is_started: bool,
    machines: MachineMap,
}
impl SchedulerThread {
    // start the scheduler thread and call run()
    fn spawn(receiver: SchedReceiver, monitor: MonitorSender, queues: (TaskInjector, SchedTaskInjector)) -> Option<thread::JoinHandle<()>> {
        log::info!("Starting scheduler");
        let thread = std::thread::spawn(move || {
            let mut sched_thread = Self {
                receiver,
                monitor,
                run_queue: queues.0,
                wait_queue: queues.1,
                is_running: true,
                is_started: false,
                machines: MachineMap::with_capacity(get_machine_count_estimate()),
            };
            sched_thread.run();
        });
        Some(thread)
    }

    // This is the top layer, where machines are added or removed.
    // it calls the build select layer.
    fn run(&mut self) {
        log::info!("running schdeuler");
        let mut stats_timer = SimpleEventTimer::default();
        let start = Instant::now();
        let mut stats = SchedStats::default();
        while self.is_running {
            // event check
            if stats_timer.check() && self.monitor.send(MonitorMessage::SchedStats(stats)).is_err() {
                log::debug!("failed to send sched stats to mointor");
            }
            // wait for something to do
            match self.receiver.recv_timeout(stats_timer.remaining()) {
                Ok(cmd) => self.maintenance(cmd, &mut stats),
                Err(RecvTimeoutError::Timeout) => (),
                Err(RecvTimeoutError::Disconnected) => self.is_running = false,
            }
        }
        stats.total_time = start.elapsed();
        log::info!("machines remaining: {}", self.machines.len());
        // Dump the machine info
        for (_, m) in self.machines.iter() {
            log::info!(
                "machine={} key={} state={:#?} q_len={} task_id={} disconnected={}",
                m.get_id(),
                m.get_key(),
                m.get_state(),
                m.channel_len(),
                m.get_task_id(),
                m.is_disconnected()
            );
        }
        log::info!("{:#?}", stats);
        log::info!("completed running schdeuler");
    }

    fn maintenance(&mut self, cmd: SchedCmd, stats: &mut SchedStats) {
        let t = Instant::now();
        match cmd {
            SchedCmd::Start => (),
            SchedCmd::Stop => self.is_running = false,
            SchedCmd::Terminate(_key) => (),
            SchedCmd::New(machine) => self.insert_machine(machine, stats),
            SchedCmd::SendComplete(key) => self.schedule_sendblock_machine(key),
            SchedCmd::Remove(key) => self.remove_machine(key, stats),
            SchedCmd::RecvBlock(key) => self.schedule_recvblock_machine(key),
            SchedCmd::RequestStats => if self.monitor.send(MonitorMessage::SchedStats(*stats)).is_err() {},
            SchedCmd::RequestMachineInfo => self.send_machine_info(),
            _ => (),
        };
        stats.maint_time += t.elapsed();
    }
    // insert a machine into the machines map, this is where the machine.key is set
    fn insert_machine(&mut self, machine: ShareableMachine, stats: &mut SchedStats) {
        let t = Instant::now();
        let entry = self.machines.vacant_entry();
        log::trace!("inserted machine {} key={}", machine.get_id(), entry.key());
        machine.key.store(entry.key(), Ordering::SeqCst);
        entry.insert(Arc::clone(&machine));

        // Always run a task, on insertion so that the machine gets the one-time
        // connection notification. Otherwise, it would have to wait for a send.
        if let Err(state) = machine.compare_and_exchange_state(MachineState::New, MachineState::Ready) {
            log::error!("insert_machine: expected state New, found state {:#?}", state);
        }
        live_machine_count.fetch_add(1, Ordering::SeqCst);
        schedule_machine(&machine, &self.run_queue);
        stats.add_time += t.elapsed();
    }

    fn remove_machine(&mut self, key: usize, stats: &mut SchedStats) {
        let t = Instant::now();
        // Believe it or not, this remove is a huge performance hit to
        // the scheduler. It results a whole bunch of drops being run.
        if let Some(machine) = self.machines.get(key) {
            log::info!(
                "removed machine {} key={} task={}",
                machine.get_id(),
                machine.get_key(),
                machine.get_task_id()
            );
        } else {
            log::warn!("machine key {} not in collective", key);
            stats.remove_time += t.elapsed();
            return;
        }
        self.machines.remove(key);
        live_machine_count.fetch_sub(1, Ordering::SeqCst);
        stats.remove_time += t.elapsed();
    }

    fn send_machine_info(&self) {
        for (_, m) in &self.machines {
            if self.monitor.send(MonitorMessage::MachineInfo(Arc::clone(m))).is_err() {
                log::debug!("unable to send machine info to monitor");
            }
        }
    }

    fn run_task(&self, machine: &ShareableMachine) {
        if let Err(state) = machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
            if state != MachineState::Ready {
                log::error!("sched run_task expected RecvBlock or Ready state{:#?}", state);
            }
        }
        schedule_machine(machine, &self.run_queue);
    }

    fn schedule_sendblock_machine(&self, key: usize) {
        // log::trace!("sched SendComplete machine {}", key);
        let machine = self.machines.get(key).unwrap();

        if let Err(state) = machine.compare_and_exchange_state(MachineState::SendBlock, MachineState::RecvBlock) {
            log::error!("sched: (SendBlock) expecting state SendBlock, found {:#?}", state);
            return;
        }
        if !machine.is_channel_empty()
            && machine
                .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
                .is_ok()
        {
            schedule_machine(machine, &self.run_queue);
        }
    }

    fn schedule_recvblock_machine(&self, key: usize) {
        // log::trace!("sched RecvBlock machine {}", key);
        let machine = self.machines.get(key).unwrap();
        if machine
            .compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready)
            .is_ok()
        {
            schedule_machine(machine, &self.run_queue);
        }
    }
}

#[cfg(test)]
mod tests {
    use self::executor::SystemExecutorFactory;
    use self::machine::get_default_channel_capacity;
    use self::overwatch::SystemMonitorFactory;
    use self::sched_factory::create_sched_factory;
    use super::*;
    use crossbeam::deque;
    use d3_derive::*;
    use std::time::Duration;

    use self::channel::{
        machine_channel::{channel, channel_with_capacity},
        receiver::Receiver,
        sender::Sender,
    };

    #[test]
    fn can_terminate() {
        let monitor_factory = SystemMonitorFactory::new();
        let executor_factory = SystemExecutorFactory::new();
        let scheduler_factory = create_sched_factory();

        let scheduler = scheduler_factory.start(monitor_factory.get_sender(), executor_factory.get_queues());
        thread::sleep(Duration::from_millis(100));
        log::info!("stopping scheduler via control");
        scheduler.stop();
        thread::sleep(Duration::from_millis(100));
    }

    #[derive(Debug, MachineImpl)]
    pub enum TestMessage {
        Test,
    }

    // A simple Alice machine
    struct Alice {}
    impl Machine<TestMessage> for Alice {
        fn receive(&self, _message: TestMessage) {}
    }

    #[allow(clippy::type_complexity)]
    pub fn build_machine<T, P>(
        machine: T,
    ) -> (
        Arc<Mutex<T>>,
        Sender<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
        MachineAdapter,
    )
    where
        T: 'static + Machine<P> + Machine<<<P as MachineImpl>::Adapter as MachineBuilder>::InstructionSet>,
        P: MachineImpl,
        <P as MachineImpl>::Adapter: MachineBuilder,
    {
        let channel_max = get_default_channel_capacity();
        let (machine, sender, collective_adapter) = <<P as MachineImpl>::Adapter as MachineBuilder>::build_raw(machine, channel_max);
        // let collective_adapter = Arc::new(Mutex::new(collective_adapter));
        // Server::assign_machine(collective_adapter);
        (machine, sender, collective_adapter)
    }

    #[test]
    fn test_scheduler() {
        let (monitor_sender, _monitor_receiver) = crossbeam::channel::unbounded::<MonitorMessage>();
        let (sched_sender, sched_receiver) = crossbeam::channel::unbounded::<SchedCmd>();
        let run_queue = Arc::new(deque::Injector::<Task>::new());
        let wait_queue = Arc::new(deque::Injector::<SchedTask>::new());

        let thread = SchedulerThread::spawn(sched_receiver, monitor_sender, (run_queue, wait_queue));
        // at this point the scheduler should be running
        std::thread::sleep(std::time::Duration::from_millis(10));

        let mut senders: Vec<Sender<TestMessage>> = Vec::new();
        let mut machines: Vec<Arc<Mutex<Alice>>> = Vec::new();
        // build 5 alice machines
        for _ in 1 ..= 5 {
            let alice = Alice {};
            let (alice, mut sender, adapter) = build_machine(alice);
            let adapter = Arc::new(adapter);
            sender.bind(Arc::downgrade(&adapter));
            senders.push(sender);
            machines.push(alice);
            sched_sender.send(SchedCmd::New(adapter)).unwrap();
        }

        let s = &senders[2];
        s.send(TestMessage::Test).unwrap();
        std::thread::sleep(std::time::Duration::from_millis(500));

        sched_sender.send(SchedCmd::Stop).unwrap();
        if let Some(thread) = thread {
            thread.join().unwrap();
        }
    }
}