d3_core/tls/
tls_executor.rs

1use self::collective::*;
2use super::*;
3// This is the TLS data for the executor. It is used by the channel and the executor;
4// Otherwise, this would be much higher in the stack.
5
6// A Task, actually a task for the executor
7#[doc(hidden)]
8pub struct Task {
9    id: usize,
10    start: Instant,
11    machine: ShareableMachine,
12    // indicates that a drop is in progress
13    drop: bool,
14}
15impl Task {
16    pub fn new(machine: &ShareableMachine, drop: bool) -> Self {
17        if !drop {
18            match machine.get_state() {
19                MachineState::RecvBlock => panic!("should not create task for RecvBlock machine"),
20                MachineState::New => panic!("should not create task for New machine"),
21                MachineState::Running => panic!("should not create task for Running machine"),
22                MachineState::Ready => (),
23                _ => panic!("should not create task for ready machine {:#?}", machine.get_state()),
24            }
25        }
26        let id = TASK_ID.fetch_add(1, Ordering::SeqCst);
27        if machine.get_task_id() != 0 {
28            log::error!(
29                "machine {} state {:#?} already on run_q as task {}",
30                machine.get_key(),
31                machine.get_state(),
32                machine.get_task_id()
33            );
34            if !drop {
35                panic!("machine already queued");
36            }
37        }
38        machine.set_task_id(id);
39        log::trace!("adding machine {} to run_q {}", machine.get_key(), id);
40        Self {
41            id,
42            start: std::time::Instant::now(),
43            machine: Arc::clone(machine),
44            drop,
45        }
46    }
47
48    pub fn is_invalid(&self, executor_id: usize) -> bool {
49        if self.id != self.machine.get_task_id() {
50            log::error!(
51                "exec {}, task_id {} doesn't match machine {} task id {}",
52                executor_id,
53                self.id,
54                self.machine.get_key(),
55                self.machine.get_task_id(),
56            );
57            true
58        } else {
59            false
60        }
61    }
62
63    // ==== Getters and Predicates ====
64
65    #[inline]
66    pub fn elapsed(&self) -> Duration { self.start.elapsed() }
67
68    #[inline]
69    pub fn machine(&self) -> ShareableMachine { Arc::clone(&self.machine) }
70
71    #[inline]
72    pub const fn task_id(&self) -> usize { self.id }
73
74    #[inline]
75    pub const fn is_receiver_disconnected(&self) -> bool { self.drop }
76}
77pub static TASK_ID: AtomicUsize = AtomicUsize::new(1);
78
79// A task for the scheduler, which will reschedule the machine
80pub struct SchedTask {
81    pub start: Instant,
82    pub machine_key: usize,
83}
84impl SchedTask {
85    pub fn new(machine_key: usize) -> Self {
86        Self {
87            start: Instant::now(),
88            machine_key,
89        }
90    }
91}
92
93/// The ExecutorStats expose metrics for each executor.
94#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
95pub struct ExecutorStats {
96    pub id: usize,
97    pub tasks_executed: u128,
98    pub instructs_sent: u128,
99    pub blocked_senders: u128,
100    pub max_blocked_senders: usize,
101    pub exhausted_slice: u128,
102    pub recv_time: std::time::Duration,
103    pub time_on_queue: std::time::Duration,
104    pub disturbed_nap: u128,
105    pub sleep_count: u128,
106    pub sleep_time: std::time::Duration,
107}
108
109// The state of the executor
110#[derive(Copy, Clone, Eq, PartialEq, SmartDefault, Debug)]
111pub enum ExecutorState {
112    #[default]
113    Init,
114    Drain,
115    Parked,
116    Running,
117}
118
119// Encapsualted send errors
120#[doc(hidden)]
121#[derive(PartialEq, Eq, Clone, Copy)]
122pub enum TrySendError {
123    // The message could not be sent because the channel is full and the operation timed out.
124    Full,
125    // The message could not be sent because the channel is disconnected.
126    Disconnected,
127}
128
129// Analogous the the ShareableMachine, the MachineSenderAdapter encapsulates
130// and adapter containing a wrapped MachineDependentSenderAdapter, which encapsulates Sender<T> and T.
131#[doc(hidden)]
132pub struct MachineSenderAdapter {
133    id: Uuid,
134    key: usize,
135    state: SharedMachineState,
136    // the normalized_adapter is an ugly trait object, which needs fixing
137    normalized_adapter: Box<dyn MachineDependentSenderAdapter>,
138}
139impl MachineSenderAdapter {
140    pub fn new(machine: &ShareableMachine, adapter: Box<dyn MachineDependentSenderAdapter>) -> Self {
141        Self {
142            id: machine.get_id(),
143            key: machine.get_key(),
144            state: machine.state.clone(),
145            normalized_adapter: adapter,
146        }
147    }
148    // Get the id of the sending machine
149    pub const fn get_id(&self) -> Uuid { self.id }
150    // Get the key of the sending machine
151    pub const fn get_key(&self) -> usize { self.key }
152    // Try to send the message
153    pub fn try_send(&mut self) -> Result<usize, TrySendError> { self.normalized_adapter.try_send() }
154}
155
156#[doc(hidden)]
157pub trait MachineDependentSenderAdapter {
158    fn try_send(&mut self) -> Result<usize, TrySendError>;
159}
160
161// This is information that the executor thread shares with the worker, allowing
162// the big executor insight into what the executor is up to.
163#[derive(Debug)]
164pub struct SharedExecutorInfo {
165    state: ExecutorState,
166}
167impl SharedExecutorInfo {
168    pub fn set_state(&mut self, new: ExecutorState) { self.state = new }
169    pub const fn get_state(&self) -> ExecutorState { self.state }
170    pub fn compare_set_state(&mut self, old: ExecutorState, new: ExecutorState) {
171        if self.state == old {
172            self.state = new
173        }
174    }
175}
176impl Default for SharedExecutorInfo {
177    fn default() -> Self {
178        Self {
179            state: ExecutorState::Init,
180        }
181    }
182}
183
184use self::scheduler::executor::{EXECUTORS_SNOOZING, RUN_QUEUE_LEN};
185use self::scheduler::setup_teardown::Server;
186use self::scheduler::traits::TaskInjector;
187
188pub fn schedule_machine(machine: &ShareableMachine, run_queue: &TaskInjector) { schedule_task(Task::new(machine, false), run_queue); }
189
190fn schedule_task(task: Task, run_queue: &TaskInjector) {
191    RUN_QUEUE_LEN.fetch_add(1, Ordering::SeqCst);
192    run_queue.push(task);
193    if EXECUTORS_SNOOZING.load(Ordering::SeqCst) != 0 {
194        Server::wake_executor_threads();
195    }
196}
197
198// ExecutorData is TLS for the executor. Among other things, it provides bridging
199// for the channel to allow a sender to park, while allowing the executor to continue
200// processing work.
201#[doc(hidden)]
202#[derive(Default)]
203pub struct ExecutorData {
204    pub id: usize,
205    pub task_id: usize,
206    pub machine: ExecutorDataField,
207    pub blocked_senders: Vec<MachineSenderAdapter>,
208    pub last_blocked_send_len: usize,
209    pub notifier: ExecutorDataField,
210    pub shared_info: Arc<Mutex<SharedExecutorInfo>>,
211    pub run_queue: ExecutorDataField,
212}
213impl ExecutorData {
214    pub fn block_or_continue() {
215        tls_executor_data.with(|t| {
216            let mut tls = t.borrow_mut();
217            // main thread can always continue and block
218            if tls.id == 0 {
219                return;
220            }
221            // executor thread can continue if machine is in Running state
222            if let ExecutorDataField::Machine(machine) = &mut tls.machine {
223                if !machine.is_running() {
224                    if !machine.is_send_blocked() {
225                        log::error!(
226                            "block_or_continue: expecting Running or SendBlock, found {:#?}",
227                            machine.get_state()
228                        );
229                    }
230                    // this executor is idle until the send that is in progress completes
231                    // stacking it would transform a bounded queue into an unbounded queue -- so don't stack
232                    tls.recursive_block();
233                }
234            }
235        });
236    }
237    pub fn recursive_block(&mut self) {
238        // we're called from a tls context, ExecutorData is for the current thread.
239        // we've already queue'd the sender, and now its trying to send more. This
240        // could go on forever, essentially blocking an executor. So, we're going
241        // to pause and drain this executor and then allow the send, that got us
242        // here, to continue, having sent the one that blocked it
243
244        if let ExecutorDataField::Machine(machine) = &self.machine {
245            log::debug!(
246                "recursive_block begin exec {}, machine {}, state {:#?}",
247                self.id,
248                machine.get_key(),
249                machine.get_state()
250            );
251        }
252
253        // if running, change to drain
254        self.shared_info
255            .lock()
256            .compare_set_state(ExecutorState::Running, ExecutorState::Drain);
257
258        self.drain();
259        // when drain returns, set back to running
260        self.shared_info
261            .lock()
262            .compare_set_state(ExecutorState::Drain, ExecutorState::Running);
263
264        if let ExecutorDataField::Machine(machine) = &self.machine {
265            log::debug!(
266                "recursive_block end exec {}, machine {}, state {:#?}",
267                self.id,
268                machine.get_key(),
269                machine.get_state()
270            );
271        } else {
272            log::error!("recursive_block end exec {} unable to locate machine", self.id);
273        }
274    }
275
276    pub fn sender_blocked(&mut self, channel_id: usize, adapter: MachineSenderAdapter) {
277        // we're called from a tls context, ExecutorData is for the current thread.
278        // upon return, the executor will return back into the channel send, which will
279        // complete the send, which is blocked. Consequently, we need to be careful
280        // about maintaining send order on a recursive entry.
281        if adapter.state.get() == MachineState::SendBlock {
282            // if we are already SendBlock, then there is send looping within the
283            // machine, and we need to use caution
284            log::info!("Executor {} detected recursive send block, this should not happen", self.id);
285            unreachable!("block_or_continue() should be called to prevent entering sender_blocked with a blocked machine")
286        }
287
288        // otherwise we can stack the incomplete send. Depth is a concern.
289        // the sends could be offloaded, however it has the potential to
290        // cause a problem with the afformentioned looping sender.
291        if let ExecutorDataField::Machine(machine) = &self.machine {
292            log::trace!(
293                "executor {} machine {} state {:#?} parking sender {} task_id {}",
294                self.id,
295                machine.get_key(),
296                machine.get_state(),
297                channel_id,
298                self.task_id,
299            );
300
301            if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::SendBlock) {
302                log::error!("sender_block: expected state Running, found machine state {:#?}", state);
303                log::error!(
304                    "sender_block: expected state Running, found adapter state {:#?}",
305                    adapter.state.get()
306                );
307                adapter.state.set(MachineState::SendBlock);
308            }
309            self.blocked_senders.push(adapter);
310        }
311    }
312
313    fn drain(&mut self) {
314        use MachineState::*;
315        // all we can do at this point is attempt to drain out sender queue
316        let (machine_key, machine_state) = match &self.machine {
317            ExecutorDataField::Machine(machine) => (machine.get_key(), machine.state.clone()),
318            _ => panic!("machine field was not set prior to running"),
319        };
320
321        let mut start_len = 0;
322        log::trace!("exec {} drain blocked {}", self.id, start_len);
323        let backoff = LinearBackoff::new();
324        while !self.blocked_senders.is_empty() {
325            if start_len != self.blocked_senders.len() {
326                start_len = self.blocked_senders.len();
327                log::trace!("exec {} drain blocked {}", self.id, start_len);
328            }
329            let mut still_blocked: Vec<MachineSenderAdapter> = Vec::with_capacity(self.blocked_senders.len());
330            let mut handled_recursive_sender = false;
331            for mut sender in self.blocked_senders.drain(..) {
332                match sender.try_send() {
333                    // handle the blocked sender that got us here
334                    Ok(_receiver_key) if sender.key == machine_key => {
335                        backoff.reset();
336                        // log::debug!("drain recursive machine ok state {:#?}", machine_state.get());
337                        if let Err(state) = machine_state.compare_and_exchange(SendBlock, Running) {
338                            log::error!("drain: expected state Running, found state {:#?}", state);
339                            machine_state.set(Running);
340                        }
341                        handled_recursive_sender = true;
342                    },
343                    Err(TrySendError::Disconnected) if sender.key == machine_key => {
344                        backoff.reset();
345                        // log::debug!("drain recursive machine disconnected state {:#?}", machine_state.get());
346                        if let Err(state) = machine_state.compare_and_exchange(SendBlock, Running) {
347                            log::debug!("drain: expected state Running, found state {:#?}", state);
348                            machine_state.set(Running);
349                        }
350                        handled_recursive_sender = true;
351                    },
352                    // handle all others
353                    Ok(receiver_key) => {
354                        backoff.reset();
355                        // let the scheduler know that this machine can now be scheduled
356                        // log::debug!("drain recursive other machine ok state {:#?}", machine_state.get());
357                        match &self.notifier {
358                            ExecutorDataField::Notifier(obj) => {
359                                obj.notify_can_schedule_sender(sender.key);
360                                obj.notify_can_schedule_receiver(receiver_key);
361                            },
362                            _ => log::error!("can't notify scheduler!!!"),
363                        };
364                    },
365                    Err(TrySendError::Disconnected) => {
366                        backoff.reset();
367                        // log::debug!("drain recursive other machine disconnected state {:#?}", machine_state.get());
368                        // let the scheduler know that this machine can now be scheduled
369                        match &self.notifier {
370                            ExecutorDataField::Notifier(obj) => obj.notify_can_schedule_sender(sender.key),
371                            _ => log::error!("can't notify scheduler!!!"),
372                        };
373                    },
374                    Err(TrySendError::Full) => {
375                        still_blocked.push(sender);
376                    },
377                }
378            }
379            self.blocked_senders = still_blocked;
380            if handled_recursive_sender {
381                break;
382            }
383            // if we haven't worked out way free, then we need to notify that we're kinda stuck
384            // even though we've done that, we may yet come free. As long as we're not told to
385            // terminate, we'll keep running.
386            if backoff.is_completed() && self.shared_info.lock().get_state() != ExecutorState::Parked {
387                // we need to notify the monitor that we're essentially dead.
388                self.shared_info.lock().set_state(ExecutorState::Parked);
389                match &self.notifier {
390                    ExecutorDataField::Notifier(obj) => obj.notify_parked(self.id),
391                    _ => log::error!("Executor {} doesn't have a notifier", self.id),
392                };
393            }
394            backoff.snooze();
395        }
396        log::debug!("drained recursive sender, allowing send to continue");
397    }
398
399    pub fn schedule(machine: &ShareableMachine, drop: bool) {
400        tls_executor_data.with(|t| {
401            let tls = t.borrow();
402            if log_enabled!(log::Level::Trace) {
403                if let ExecutorDataField::Machine(tls_machine) = &tls.machine {
404                    log::trace!(
405                        "exec {} machine {} is scheduling machine {}",
406                        tls.id,
407                        tls_machine.get_key(),
408                        machine.get_key()
409                    );
410                } else {
411                    log::trace!("exec {} machine main-thread is scheduling machine {}", tls.id, machine.get_key());
412                }
413            }
414            if let ExecutorDataField::RunQueue(run_q) = &tls.run_queue {
415                schedule_task(Task::new(machine, drop), run_q);
416            } else {
417                // gotta do this the hard way
418                if let Ok(run_q) = Server::get_run_queue() {
419                    schedule_task(Task::new(machine, drop), &run_q);
420                } else {
421                    log::error!("unable to obtain run_queue");
422                }
423            }
424        });
425    }
426}
427
428// Encoding the structs as a variant allows it to be stored in the TLS as a field.
429#[doc(hidden)]
430#[derive(SmartDefault)]
431pub enum ExecutorDataField {
432    #[default]
433    Uninitialized,
434    Notifier(ExecutorNotifierObj),
435    Machine(ShareableMachine),
436    RunQueue(Arc<crossbeam::deque::Injector<Task>>),
437}
438
439// The trait that allows the executor to perform notifications
440pub trait ExecutorNotifier: Send + Sync + 'static {
441    // Send a notificiation that the executor is parked
442    fn notify_parked(&self, executor_id: usize);
443    // Send a notification that a parked sender is no long parked, and can be scheduled
444    fn notify_can_schedule_sender(&self, machine_key: usize);
445    // Send a notification that a parked sender's receiver may need to be scheduled
446    fn notify_can_schedule_receiver(&self, machine_key: usize);
447}
448pub type ExecutorNotifierObj = std::sync::Arc<dyn ExecutorNotifier>;
449
450thread_local! {
451    #[doc(hidden)]
452    #[allow(non_upper_case_globals)]
453    pub static tls_executor_data: RefCell<ExecutorData> = RefCell::new(ExecutorData::default());
454}
455
456#[cfg(test)]
457mod tests {
458    #[allow(unused_imports)] use super::*;
459}