1use self::collective::*;
2use super::*;
3#[doc(hidden)]
8pub struct Task {
9 id: usize,
10 start: Instant,
11 machine: ShareableMachine,
12 drop: bool,
14}
15impl Task {
16 pub fn new(machine: &ShareableMachine, drop: bool) -> Self {
17 if !drop {
18 match machine.get_state() {
19 MachineState::RecvBlock => panic!("should not create task for RecvBlock machine"),
20 MachineState::New => panic!("should not create task for New machine"),
21 MachineState::Running => panic!("should not create task for Running machine"),
22 MachineState::Ready => (),
23 _ => panic!("should not create task for ready machine {:#?}", machine.get_state()),
24 }
25 }
26 let id = TASK_ID.fetch_add(1, Ordering::SeqCst);
27 if machine.get_task_id() != 0 {
28 log::error!(
29 "machine {} state {:#?} already on run_q as task {}",
30 machine.get_key(),
31 machine.get_state(),
32 machine.get_task_id()
33 );
34 if !drop {
35 panic!("machine already queued");
36 }
37 }
38 machine.set_task_id(id);
39 log::trace!("adding machine {} to run_q {}", machine.get_key(), id);
40 Self {
41 id,
42 start: std::time::Instant::now(),
43 machine: Arc::clone(machine),
44 drop,
45 }
46 }
47
48 pub fn is_invalid(&self, executor_id: usize) -> bool {
49 if self.id != self.machine.get_task_id() {
50 log::error!(
51 "exec {}, task_id {} doesn't match machine {} task id {}",
52 executor_id,
53 self.id,
54 self.machine.get_key(),
55 self.machine.get_task_id(),
56 );
57 true
58 } else {
59 false
60 }
61 }
62
63 #[inline]
66 pub fn elapsed(&self) -> Duration { self.start.elapsed() }
67
68 #[inline]
69 pub fn machine(&self) -> ShareableMachine { Arc::clone(&self.machine) }
70
71 #[inline]
72 pub const fn task_id(&self) -> usize { self.id }
73
74 #[inline]
75 pub const fn is_receiver_disconnected(&self) -> bool { self.drop }
76}
77pub static TASK_ID: AtomicUsize = AtomicUsize::new(1);
78
79pub struct SchedTask {
81 pub start: Instant,
82 pub machine_key: usize,
83}
84impl SchedTask {
85 pub fn new(machine_key: usize) -> Self {
86 Self {
87 start: Instant::now(),
88 machine_key,
89 }
90 }
91}
92
93#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
95pub struct ExecutorStats {
96 pub id: usize,
97 pub tasks_executed: u128,
98 pub instructs_sent: u128,
99 pub blocked_senders: u128,
100 pub max_blocked_senders: usize,
101 pub exhausted_slice: u128,
102 pub recv_time: std::time::Duration,
103 pub time_on_queue: std::time::Duration,
104 pub disturbed_nap: u128,
105 pub sleep_count: u128,
106 pub sleep_time: std::time::Duration,
107}
108
109#[derive(Copy, Clone, Eq, PartialEq, SmartDefault, Debug)]
111pub enum ExecutorState {
112 #[default]
113 Init,
114 Drain,
115 Parked,
116 Running,
117}
118
119#[doc(hidden)]
121#[derive(PartialEq, Eq, Clone, Copy)]
122pub enum TrySendError {
123 Full,
125 Disconnected,
127}
128
129#[doc(hidden)]
132pub struct MachineSenderAdapter {
133 id: Uuid,
134 key: usize,
135 state: SharedMachineState,
136 normalized_adapter: Box<dyn MachineDependentSenderAdapter>,
138}
139impl MachineSenderAdapter {
140 pub fn new(machine: &ShareableMachine, adapter: Box<dyn MachineDependentSenderAdapter>) -> Self {
141 Self {
142 id: machine.get_id(),
143 key: machine.get_key(),
144 state: machine.state.clone(),
145 normalized_adapter: adapter,
146 }
147 }
148 pub const fn get_id(&self) -> Uuid { self.id }
150 pub const fn get_key(&self) -> usize { self.key }
152 pub fn try_send(&mut self) -> Result<usize, TrySendError> { self.normalized_adapter.try_send() }
154}
155
156#[doc(hidden)]
157pub trait MachineDependentSenderAdapter {
158 fn try_send(&mut self) -> Result<usize, TrySendError>;
159}
160
161#[derive(Debug)]
164pub struct SharedExecutorInfo {
165 state: ExecutorState,
166}
167impl SharedExecutorInfo {
168 pub fn set_state(&mut self, new: ExecutorState) { self.state = new }
169 pub const fn get_state(&self) -> ExecutorState { self.state }
170 pub fn compare_set_state(&mut self, old: ExecutorState, new: ExecutorState) {
171 if self.state == old {
172 self.state = new
173 }
174 }
175}
176impl Default for SharedExecutorInfo {
177 fn default() -> Self {
178 Self {
179 state: ExecutorState::Init,
180 }
181 }
182}
183
184use self::scheduler::executor::{EXECUTORS_SNOOZING, RUN_QUEUE_LEN};
185use self::scheduler::setup_teardown::Server;
186use self::scheduler::traits::TaskInjector;
187
188pub fn schedule_machine(machine: &ShareableMachine, run_queue: &TaskInjector) { schedule_task(Task::new(machine, false), run_queue); }
189
190fn schedule_task(task: Task, run_queue: &TaskInjector) {
191 RUN_QUEUE_LEN.fetch_add(1, Ordering::SeqCst);
192 run_queue.push(task);
193 if EXECUTORS_SNOOZING.load(Ordering::SeqCst) != 0 {
194 Server::wake_executor_threads();
195 }
196}
197
198#[doc(hidden)]
202#[derive(Default)]
203pub struct ExecutorData {
204 pub id: usize,
205 pub task_id: usize,
206 pub machine: ExecutorDataField,
207 pub blocked_senders: Vec<MachineSenderAdapter>,
208 pub last_blocked_send_len: usize,
209 pub notifier: ExecutorDataField,
210 pub shared_info: Arc<Mutex<SharedExecutorInfo>>,
211 pub run_queue: ExecutorDataField,
212}
213impl ExecutorData {
214 pub fn block_or_continue() {
215 tls_executor_data.with(|t| {
216 let mut tls = t.borrow_mut();
217 if tls.id == 0 {
219 return;
220 }
221 if let ExecutorDataField::Machine(machine) = &mut tls.machine {
223 if !machine.is_running() {
224 if !machine.is_send_blocked() {
225 log::error!(
226 "block_or_continue: expecting Running or SendBlock, found {:#?}",
227 machine.get_state()
228 );
229 }
230 tls.recursive_block();
233 }
234 }
235 });
236 }
237 pub fn recursive_block(&mut self) {
238 if let ExecutorDataField::Machine(machine) = &self.machine {
245 log::debug!(
246 "recursive_block begin exec {}, machine {}, state {:#?}",
247 self.id,
248 machine.get_key(),
249 machine.get_state()
250 );
251 }
252
253 self.shared_info
255 .lock()
256 .compare_set_state(ExecutorState::Running, ExecutorState::Drain);
257
258 self.drain();
259 self.shared_info
261 .lock()
262 .compare_set_state(ExecutorState::Drain, ExecutorState::Running);
263
264 if let ExecutorDataField::Machine(machine) = &self.machine {
265 log::debug!(
266 "recursive_block end exec {}, machine {}, state {:#?}",
267 self.id,
268 machine.get_key(),
269 machine.get_state()
270 );
271 } else {
272 log::error!("recursive_block end exec {} unable to locate machine", self.id);
273 }
274 }
275
276 pub fn sender_blocked(&mut self, channel_id: usize, adapter: MachineSenderAdapter) {
277 if adapter.state.get() == MachineState::SendBlock {
282 log::info!("Executor {} detected recursive send block, this should not happen", self.id);
285 unreachable!("block_or_continue() should be called to prevent entering sender_blocked with a blocked machine")
286 }
287
288 if let ExecutorDataField::Machine(machine) = &self.machine {
292 log::trace!(
293 "executor {} machine {} state {:#?} parking sender {} task_id {}",
294 self.id,
295 machine.get_key(),
296 machine.get_state(),
297 channel_id,
298 self.task_id,
299 );
300
301 if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::SendBlock) {
302 log::error!("sender_block: expected state Running, found machine state {:#?}", state);
303 log::error!(
304 "sender_block: expected state Running, found adapter state {:#?}",
305 adapter.state.get()
306 );
307 adapter.state.set(MachineState::SendBlock);
308 }
309 self.blocked_senders.push(adapter);
310 }
311 }
312
313 fn drain(&mut self) {
314 use MachineState::*;
315 let (machine_key, machine_state) = match &self.machine {
317 ExecutorDataField::Machine(machine) => (machine.get_key(), machine.state.clone()),
318 _ => panic!("machine field was not set prior to running"),
319 };
320
321 let mut start_len = 0;
322 log::trace!("exec {} drain blocked {}", self.id, start_len);
323 let backoff = LinearBackoff::new();
324 while !self.blocked_senders.is_empty() {
325 if start_len != self.blocked_senders.len() {
326 start_len = self.blocked_senders.len();
327 log::trace!("exec {} drain blocked {}", self.id, start_len);
328 }
329 let mut still_blocked: Vec<MachineSenderAdapter> = Vec::with_capacity(self.blocked_senders.len());
330 let mut handled_recursive_sender = false;
331 for mut sender in self.blocked_senders.drain(..) {
332 match sender.try_send() {
333 Ok(_receiver_key) if sender.key == machine_key => {
335 backoff.reset();
336 if let Err(state) = machine_state.compare_and_exchange(SendBlock, Running) {
338 log::error!("drain: expected state Running, found state {:#?}", state);
339 machine_state.set(Running);
340 }
341 handled_recursive_sender = true;
342 },
343 Err(TrySendError::Disconnected) if sender.key == machine_key => {
344 backoff.reset();
345 if let Err(state) = machine_state.compare_and_exchange(SendBlock, Running) {
347 log::debug!("drain: expected state Running, found state {:#?}", state);
348 machine_state.set(Running);
349 }
350 handled_recursive_sender = true;
351 },
352 Ok(receiver_key) => {
354 backoff.reset();
355 match &self.notifier {
358 ExecutorDataField::Notifier(obj) => {
359 obj.notify_can_schedule_sender(sender.key);
360 obj.notify_can_schedule_receiver(receiver_key);
361 },
362 _ => log::error!("can't notify scheduler!!!"),
363 };
364 },
365 Err(TrySendError::Disconnected) => {
366 backoff.reset();
367 match &self.notifier {
370 ExecutorDataField::Notifier(obj) => obj.notify_can_schedule_sender(sender.key),
371 _ => log::error!("can't notify scheduler!!!"),
372 };
373 },
374 Err(TrySendError::Full) => {
375 still_blocked.push(sender);
376 },
377 }
378 }
379 self.blocked_senders = still_blocked;
380 if handled_recursive_sender {
381 break;
382 }
383 if backoff.is_completed() && self.shared_info.lock().get_state() != ExecutorState::Parked {
387 self.shared_info.lock().set_state(ExecutorState::Parked);
389 match &self.notifier {
390 ExecutorDataField::Notifier(obj) => obj.notify_parked(self.id),
391 _ => log::error!("Executor {} doesn't have a notifier", self.id),
392 };
393 }
394 backoff.snooze();
395 }
396 log::debug!("drained recursive sender, allowing send to continue");
397 }
398
399 pub fn schedule(machine: &ShareableMachine, drop: bool) {
400 tls_executor_data.with(|t| {
401 let tls = t.borrow();
402 if log_enabled!(log::Level::Trace) {
403 if let ExecutorDataField::Machine(tls_machine) = &tls.machine {
404 log::trace!(
405 "exec {} machine {} is scheduling machine {}",
406 tls.id,
407 tls_machine.get_key(),
408 machine.get_key()
409 );
410 } else {
411 log::trace!("exec {} machine main-thread is scheduling machine {}", tls.id, machine.get_key());
412 }
413 }
414 if let ExecutorDataField::RunQueue(run_q) = &tls.run_queue {
415 schedule_task(Task::new(machine, drop), run_q);
416 } else {
417 if let Ok(run_q) = Server::get_run_queue() {
419 schedule_task(Task::new(machine, drop), &run_q);
420 } else {
421 log::error!("unable to obtain run_queue");
422 }
423 }
424 });
425 }
426}
427
428#[doc(hidden)]
430#[derive(SmartDefault)]
431pub enum ExecutorDataField {
432 #[default]
433 Uninitialized,
434 Notifier(ExecutorNotifierObj),
435 Machine(ShareableMachine),
436 RunQueue(Arc<crossbeam::deque::Injector<Task>>),
437}
438
439pub trait ExecutorNotifier: Send + Sync + 'static {
441 fn notify_parked(&self, executor_id: usize);
443 fn notify_can_schedule_sender(&self, machine_key: usize);
445 fn notify_can_schedule_receiver(&self, machine_key: usize);
447}
448pub type ExecutorNotifierObj = std::sync::Arc<dyn ExecutorNotifier>;
449
450thread_local! {
451 #[doc(hidden)]
452 #[allow(non_upper_case_globals)]
453 pub static tls_executor_data: RefCell<ExecutorData> = RefCell::new(ExecutorData::default());
454}
455
456#[cfg(test)]
457mod tests {
458 #[allow(unused_imports)] use super::*;
459}