1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
use self::collective::*;
use super::*;
// This is the TLS data for the executor. It is used by the channel and the executor;
// Otherwise, this would be much higher in the stack.

// A Task, actually a task for the executor
#[doc(hidden)]
pub struct Task {
    pub start: Instant,
    pub machine: ShareableMachine,
}
impl Task {
    pub fn new(machine: &ShareableMachine) -> Self {
        Self {
            start: std::time::Instant::now(),
            machine: Arc::clone(machine),
        }
    }
}

// A task for the scheduler, which will reschedule the machine
pub struct SchedTask {
    pub start: Instant,
    pub machine_key: usize,
}
impl SchedTask {
    pub fn new(machine_key: usize) -> Self {
        Self {
            start: Instant::now(),
            machine_key,
        }
    }
}

/// The ExecutorStats expose metrics for each executor.
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct ExecutorStats {
    pub id: usize,
    pub tasks_executed: u128,
    pub instructs_sent: u128,
    pub blocked_senders: u128,
    pub max_blocked_senders: usize,
    pub exhausted_slice: u128,
    pub recv_time: std::time::Duration,
    pub time_on_queue: std::time::Duration,
}

// The state of the executor
#[derive(Copy, Clone, Eq, PartialEq, SmartDefault, Debug)]
pub enum ExecutorState {
    #[default]
    Init,
    Drain,
    Parked,
    Running,
}

// Encapsualted send errors
#[doc(hidden)]
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError {
    // The message could not be sent because the channel is full and the operation timed out.
    Full,
    // The message could not be sent because the channel is disconnected.
    Disconnected,
}

// Analogous the the ShareableMachine, the SharedCollectiveSenderAdapter encapsulates
// and adapter containing a wrapped CollectiveSenderAdapter, which encapsulates Sender<T> and T.
#[doc(hidden)]
pub struct SharedCollectiveSenderAdapter {
    pub id: Uuid,
    pub key: usize,
    pub state: MachineState,
    // the normalized_adapter is an ugly trait object, which needs fixing
    pub normalized_adapter: CommonCollectiveSenderAdapter,
}
impl SharedCollectiveSenderAdapter {
    // Get the id of the sending machine
    pub const fn get_id(&self) -> Uuid { self.id }
    // Get the key of the sending machine
    pub const fn get_key(&self) -> usize { self.key }
    // Try to send the message
    pub fn try_send(&mut self) -> Result<(), TrySendError> { self.normalized_adapter.try_send() }
}

#[doc(hidden)]
pub trait CollectiveSenderAdapter {
    // Get the id of the sending machine
    fn get_id(&self) -> Uuid;
    // Get the key of the sending machine
    fn get_key(&self) -> usize;
    // Try to send the message
    fn try_send(&mut self) -> Result<(), TrySendError>;
}
pub type CommonCollectiveSenderAdapter = Box<dyn CollectiveSenderAdapter>;

// This is information that the executor thread shares with the worker, allowing
// the big executor insight into what the executor is up to.
#[derive(Debug)]
pub struct SharedExecutorInfo {
    state: ExecutorState,
    start_idle: Instant,
}
impl SharedExecutorInfo {
    pub fn set_idle(&mut self) -> Instant {
        self.start_idle = Instant::now();
        self.start_idle
    }
    pub fn set_state(&mut self, new: ExecutorState) { self.state = new }
    pub const fn get_state(&self) -> ExecutorState { self.state }
    pub fn compare_set_state(&mut self, old: ExecutorState, new: ExecutorState) {
        if self.state == old {
            self.state = new
        }
    }
    pub fn get_state_and_elapsed(&self) -> (ExecutorState, Duration) { (self.state, self.start_idle.elapsed()) }
}
impl Default for SharedExecutorInfo {
    fn default() -> Self {
        Self {
            state: ExecutorState::Init,
            start_idle: Instant::now(),
        }
    }
}

// ExecutorData is TLS for the executor. Among other things, it provides bridging
// for the channel to allow a sender to park, while allowing the executor to continue
// processing work
#[doc(hidden)]
#[derive(Default)]
pub struct ExecutorData {
    pub id: usize,
    pub machine: ExecutorDataField,
    pub blocked_senders: Vec<SharedCollectiveSenderAdapter>,
    pub last_blocked_send_len: usize,
    pub notifier: ExecutorDataField,
    pub shared_info: Arc<Mutex<SharedExecutorInfo>>,
}
impl ExecutorData {
    pub fn block_or_continue() {
        tls_executor_data.with(|t| {
            let mut tls = t.borrow_mut();
            // main thread can always continue and block
            if tls.id == 0 {
                return;
            }
            if let ExecutorDataField::Machine(machine) = &tls.machine {
                if machine.state.get() != CollectiveState::Running {
                    tls.recursive_block();
                }
            }
        });
    }
    pub fn recursive_block(&mut self) {
        // we're called from a tls context, ExecutorData is for the current thread.
        // we've already queue'd the sender, and now its trying to send more. This
        // could go on forever, essentially blocking an executor. So, we're going
        // to pause and drain this executor and then allow the send, that got us
        // here, to continue, having sent the one that blocked it

        // if running, change to drain
        self.shared_info
            .lock()
            .as_mut()
            .unwrap()
            .compare_set_state(ExecutorState::Running, ExecutorState::Drain);
        self.drain();
        let mut mutable = self.shared_info.lock().unwrap();
        // when drain returns, set back to running and reset idle
        mutable.compare_set_state(ExecutorState::Drain, ExecutorState::Running);
        mutable.set_idle();
    }
    pub fn sender_blocked(&mut self, channel_id: usize, adapter: SharedCollectiveSenderAdapter) {
        // we're called from a tls context, ExecutorData is for the current thread.
        // upon return, the executor will return back into the channel send, which will
        // complete the send, which is blocked. Consequently, we need to be careful
        // about maintaining send order on a recursive entry.
        if adapter.state.get() == CollectiveState::SendBlock {
            // if we are already SendBlock, then there is send looping within the
            // machine, and we need to use caution
            log::info!(
                "Executor {} detected recursive send block, this should not happen",
                self.id
            );
            unreachable!(
                "block_or_continue() should be called to prevent entering sender_blocked with a blocked machine"
            )
        } else {
            // otherwise we can stack the incomplete send. Depth is a concern.
            // the sends could be offloaded, however it has the potential to
            // cause a problem with the afformentioned looping sender.
            log::trace!("executor {} parking sender {}", self.id, channel_id);
            adapter.state.set(CollectiveState::SendBlock);
            self.blocked_senders.push(adapter);
        }
    }
    fn drain(&mut self) {
        // all we can do at this point is attempt to drain out sender queue
        let backoff = crossbeam::utils::Backoff::new();
        let (machine_key, machine_state) = match &self.machine {
            ExecutorDataField::Machine(machine) => (machine.key, machine.state.clone()),
            _ => panic!("machine field was not set prior to running"),
        };
        while !self.blocked_senders.is_empty() {
            self.shared_info.lock().as_mut().unwrap().set_idle();
            let mut still_blocked: Vec<SharedCollectiveSenderAdapter> = Vec::with_capacity(self.blocked_senders.len());
            let mut handled_recursive_sender = false;
            for mut sender in self.blocked_senders.drain(..) {
                match sender.try_send() {
                    // handle the blocked sender that got us here
                    Ok(()) if sender.key == machine_key => {
                        backoff.reset();
                        machine_state.set(CollectiveState::Running);
                        handled_recursive_sender = true;
                    },
                    Err(TrySendError::Disconnected) if sender.key == machine_key => {
                        backoff.reset();
                        machine_state.set(CollectiveState::Running);
                        handled_recursive_sender = true;
                    },
                    // handle all others
                    Ok(()) => {
                        backoff.reset();
                        // let the scheduler know that this machine can now be scheduled
                        match &self.notifier {
                            ExecutorDataField::Notifier(obj) => obj.notify_can_schedule(sender.key),
                            _ => log::error!("can't notify scheduler!!!"),
                        };
                    },
                    Err(TrySendError::Disconnected) => {
                        backoff.reset();
                        // let the scheduler know that this machine can now be scheduled
                        match &self.notifier {
                            ExecutorDataField::Notifier(obj) => obj.notify_can_schedule(sender.key),
                            _ => log::error!("can't notify scheduler!!!"),
                        };
                    },
                    Err(TrySendError::Full) => {
                        still_blocked.push(sender);
                    },
                }
            }
            self.blocked_senders = still_blocked;
            if handled_recursive_sender {
                break;
            }
            // if we haven't worked out way free, then we need to notify that we're kinda stuck
            // even though we've done that, we may yet come free. As long as we're not told to
            // terminate, we'll keep running.
            if backoff.is_completed() && self.shared_info.lock().unwrap().get_state() != ExecutorState::Parked {
                // we need to notify the monitor that we're essentially dead.
                self.shared_info
                    .lock()
                    .as_mut()
                    .unwrap()
                    .set_state(ExecutorState::Parked);
                match &self.notifier {
                    ExecutorDataField::Notifier(obj) => obj.notify_parked(self.id),
                    _ => log::error!("Executor {} doesn't have a notifier", self.id),
                };
            }
            backoff.snooze();
        }
        log::debug!("drained recursive sender, allowing send to continue");
    }
}

// Encoding the structs as a variant allows it to be stored in the TLS as a field.
#[doc(hidden)]
#[derive(SmartDefault)]
pub enum ExecutorDataField {
    #[default]
    Uninitialized,
    Notifier(ExecutorNotifierObj),
    Machine(ShareableMachine),
}

// The trait that allows the executor to perform notifications
pub trait ExecutorNotifier: Send + Sync + 'static {
    // Send a notificiation that the executor is parked
    fn notify_parked(&self, executor_id: usize);
    // Send a notification that a parked sender is no long parked, and can be scheduled
    fn notify_can_schedule(&self, machine_key: usize);
}
pub type ExecutorNotifierObj = std::sync::Arc<dyn ExecutorNotifier>;

thread_local! {
    #[doc(hidden)]
    #[allow(non_upper_case_globals)]
    pub static tls_executor_data: RefCell<ExecutorData> = RefCell::new(ExecutorData::default());
}

#[cfg(test)]
mod tests {
    #[allow(unused_imports)] use super::*;
}