1use crate::{accurate_sleep_until, InspectorReport, ScheduleExecutor};
4use nodo::codelet::{Clocks, NodeletId, NodeletSetup, WorkerId};
5
6pub struct Executor {
7 next_worker_id: WorkerId,
8 clocks: Clocks,
9 workers: Vec<Worker>,
10}
11
12pub enum WorkerRequest {
13 Stop,
14 Report,
15}
16
17pub enum WorkerReply {
18 Report(InspectorReport),
19}
20
21pub struct WorkerState {
22 schedule: ScheduleExecutor,
23 rx_request: std::sync::mpsc::Receiver<WorkerRequest>,
24 tx_reply: std::sync::mpsc::Sender<WorkerReply>,
25}
26
27impl Executor {
28 pub fn new() -> Self {
29 Self {
30 next_worker_id: WorkerId(0),
31 clocks: Clocks::new(),
32 workers: Vec::new(),
33 }
34 }
35
36 pub fn push(&mut self, mut schedule: ScheduleExecutor) {
37 let worker_id = self.next_worker_id;
38 self.next_worker_id.0 += 1;
39
40 schedule.setup(NodeletSetup {
41 clocks: self.clocks.clone(),
42 nodelet_id_issue: NodeletId(worker_id, 0),
43 });
44
45 self.workers.push(Worker::new(schedule));
46 }
47
48 pub fn is_finished(&self) -> bool {
49 self.workers.iter().all(|w| w.is_finished())
50 }
51
52 pub fn join(&mut self) {
53 for w in self.workers.iter_mut() {
54 w.join()
55 .map_err(|err| {
56 log::error!(
57 "Could not join thread of worker '{}': {err:?}. Maybe it panicked previously.",
58 w.name
59 )
60 })
61 .ok();
62 }
63 }
64
65 pub fn request_stop(&mut self) {
66 for w in self.workers.iter() {
67 w.tx_request
68 .send(WorkerRequest::Stop)
69 .map_err(|err| {
70 log::error!(
71 "Could not request worker '{}' to stop: {err:?}. Maybe it panicked previously.",
72 w.name
73 )
74 })
75 .ok();
76 }
77 }
78
79 pub fn report(&self) -> InspectorReport {
80 let mut result = InspectorReport::default();
81 for w in self.workers.iter() {
82 result.extend(w.report());
83 }
84 result
85 }
86}
87
88pub struct Worker {
89 name: String,
90 thread: Option<std::thread::JoinHandle<()>>,
91 tx_request: std::sync::mpsc::Sender<WorkerRequest>,
92 rx_reply: std::sync::mpsc::Receiver<WorkerReply>,
93}
94
95impl Worker {
96 fn new(schedule: ScheduleExecutor) -> Self {
97 let (tx_request, rx_request) = std::sync::mpsc::channel();
98 let (tx_reply, rx_reply) = std::sync::mpsc::channel();
99 let name = schedule.name().to_string();
100 let state = WorkerState {
101 schedule,
102 rx_request,
103 tx_reply,
104 };
105 Self {
106 name: name.clone(),
107 thread: Some(
108 std::thread::Builder::new()
109 .name(name)
110 .spawn(move || Self::worker_thread(state))
111 .unwrap(),
112 ),
113 tx_request,
114 rx_reply,
115 }
116 }
117
118 fn is_finished(&self) -> bool {
119 self.thread.as_ref().map_or(true, |h| h.is_finished())
120 }
121
122 fn join(&mut self) -> Result<(), ()> {
123 if let Some(thread) = self.thread.take() {
124 thread.join().map_err(|_| ())
125 } else {
126 Ok(())
127 }
128 }
129
130 fn worker_thread(mut state: WorkerState) {
131 loop {
132 let maybe_next_instant = {
134 if let Some(period) = state.schedule.period() {
135 state.schedule.last_instant().map(|t| t + period)
136 } else {
137 None
138 }
139 };
140 if let Some(next_instant) = maybe_next_instant {
141 accurate_sleep_until(next_instant);
142 }
143
144 match state.rx_request.try_recv() {
146 Ok(WorkerRequest::Stop) => break,
147 Ok(WorkerRequest::Report) => state
148 .tx_reply
149 .send(WorkerReply::Report(state.schedule.report()))
150 .unwrap(),
151 Err(_) => {
152 }
154 };
155
156 state.schedule.spin();
158 if state.schedule.is_terminated() {
159 break;
160 }
161 }
162
163 state.schedule.finalize();
164
165 state
166 .tx_reply
167 .send(WorkerReply::Report(state.schedule.report()))
168 .ok();
169 }
170
171 fn report(&self) -> InspectorReport {
172 self.tx_request.send(WorkerRequest::Report).ok();
173 match self.rx_reply.recv() {
174 Ok(WorkerReply::Report(stats)) => stats,
175 _ => panic!(),
176 }
177 }
178}