1use std::{
2 fmt::Debug,
3 hash::Hash,
4 time::{Duration, Instant},
5};
6
7use crate::{
8 backend::{Backend, BackendIncoming, BackendIncomingInternal, BackendOutgoing},
9 bus::{
10 BusEventSource, BusLocalHub, BusPubSubFeature, BusSendMultiFeature, BusSendSingleFeature,
11 BusWorker,
12 },
13};
14
15#[derive(Debug)]
16pub enum BusChannelControl<ChannelId, MSG> {
17 Subscribe(ChannelId),
18 Unsubscribe(ChannelId),
19 Publish(ChannelId, bool, MSG),
21}
22
23impl<ChannelId, MSG> BusChannelControl<ChannelId, MSG> {
24 pub fn convert_into<NChannelId: From<ChannelId>, NMSG: From<MSG>>(
25 self,
26 ) -> BusChannelControl<NChannelId, NMSG> {
27 match self {
28 Self::Subscribe(channel) => BusChannelControl::Subscribe(channel.into()),
29 Self::Unsubscribe(channel) => BusChannelControl::Unsubscribe(channel.into()),
30 Self::Publish(channel, safe, msg) => {
31 BusChannelControl::Publish(channel.into(), safe, msg.into())
32 }
33 }
34 }
35}
36
37#[derive(Debug)]
38pub enum BusControl<Owner, ChannelId, MSG> {
39 Channel(Owner, BusChannelControl<ChannelId, MSG>),
40 Broadcast(bool, MSG),
41}
42
43impl<Owner, ChannelId, MSG> BusControl<Owner, ChannelId, MSG> {
44 pub fn high_priority(&self) -> bool {
45 matches!(
46 self,
47 Self::Channel(_, BusChannelControl::Subscribe(..))
48 | Self::Channel(_, BusChannelControl::Unsubscribe(..))
49 )
50 }
51
52 pub fn convert_into<NOwner: From<Owner>, NChannelId: From<ChannelId>, NMSG: From<MSG>>(
53 self,
54 ) -> BusControl<Owner, NChannelId, NMSG> {
55 match self {
56 Self::Channel(owner, event) => BusControl::Channel(owner, event.convert_into()),
57 Self::Broadcast(safe, msg) => BusControl::Broadcast(safe, msg.into()),
58 }
59 }
60}
61
62#[derive(Debug)]
63pub enum BusEvent<Owner, ChannelId, MSG> {
64 Broadcast(u16, MSG),
65 Channel(Owner, ChannelId, MSG),
66}
67
68#[derive(Debug, Clone)]
69pub enum WorkerControlIn<Ext: Clone, SCfg> {
70 Ext(Ext),
71 Spawn(SCfg),
72 StatsRequest,
73 Shutdown,
74}
75
76#[derive(Debug)]
77pub enum WorkerControlOut<Ext, SCfg> {
78 Stats(WorkerStats),
79 Ext(Ext),
80 Spawn(SCfg),
81}
82
83#[derive(Debug, Default)]
84pub struct WorkerStats {
85 pub tasks: usize,
86 pub utilization: u32,
87 pub is_empty: bool,
88}
89
90impl WorkerStats {
91 pub fn load(&self) -> u32 {
92 self.tasks as u32
93 }
94
95 pub fn tasks(&self) -> usize {
96 self.tasks
97 }
98}
99
100#[derive(Debug)]
101pub enum WorkerInnerInput<Owner, ExtIn, ChannelId, Event> {
102 Net(Owner, BackendIncoming),
103 Bus(BusEvent<Owner, ChannelId, Event>),
104 Ext(ExtIn),
105}
106
107pub enum WorkerInnerOutput<Owner, ExtOut, ChannelId, Event, SCfg> {
108 Net(Owner, BackendOutgoing),
109 Bus(BusControl<Owner, ChannelId, Event>),
110 Ext(bool, ExtOut),
112 Spawn(SCfg),
113 Continue,
114}
115
116pub trait WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg> {
117 fn build(worker: u16, cfg: ICfg) -> Self;
118 fn worker_index(&self) -> u16;
119 fn tasks(&self) -> usize;
120 fn is_empty(&self) -> bool;
121 fn spawn(&mut self, now: Instant, cfg: SCfg);
122 fn on_tick(&mut self, now: Instant);
123 fn on_event(&mut self, now: Instant, event: WorkerInnerInput<Owner, ExtIn, ChannelId, Event>);
124 fn on_shutdown(&mut self, now: Instant);
125 fn pop_output(
126 &mut self,
127 now: Instant,
128 ) -> Option<WorkerInnerOutput<Owner, ExtOut, ChannelId, Event, SCfg>>;
129}
130
131pub(crate) struct Worker<
132 Owner,
133 ExtIn: Clone,
134 ExtOut: Clone,
135 ChannelId: Hash + PartialEq + Eq,
136 Event,
137 Inner: WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg>,
138 ICfg,
139 SCfg,
140 B: Backend<Owner>,
141 const INNER_BUS_STACK: usize,
142> {
143 tick: Duration,
144 last_tick: Instant,
145 inner: Inner,
146 bus_local_hub: BusLocalHub<Owner, ChannelId>,
147 inner_bus: BusWorker<ChannelId, Event, INNER_BUS_STACK>,
148 backend: B,
149 worker_out: BusWorker<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
150 worker_in: BusWorker<u16, WorkerControlIn<ExtIn, SCfg>, INNER_BUS_STACK>,
151 _tmp: std::marker::PhantomData<(Owner, ICfg)>,
152}
153
154impl<
155 Owner: Debug + Clone + Copy + PartialEq,
156 ExtIn: Clone,
157 ExtOut: Clone,
158 ChannelId: Hash + PartialEq + Eq + Debug + Copy,
159 Event: Clone,
160 Inner: WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg>,
161 B: Backend<Owner>,
162 ICfg,
163 SCfg,
164 const INNER_BUS_STACK: usize,
165 > Worker<Owner, ExtIn, ExtOut, ChannelId, Event, Inner, ICfg, SCfg, B, INNER_BUS_STACK>
166{
167 pub fn new(
168 tick: Duration,
169 inner: Inner,
170 inner_bus: BusWorker<ChannelId, Event, INNER_BUS_STACK>,
171 worker_out: BusWorker<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
172 worker_in: BusWorker<u16, WorkerControlIn<ExtIn, SCfg>, INNER_BUS_STACK>,
173 ) -> Self {
174 let backend = B::default();
175 inner_bus.set_awaker(backend.create_awaker());
176 Self {
177 inner,
178 tick,
179 last_tick: Instant::now() - 2 * tick, bus_local_hub: Default::default(),
181 inner_bus,
182 backend,
183 worker_out,
184 worker_in,
185 _tmp: Default::default(),
186 }
187 }
188
189 pub fn init(&mut self) {
190 let now = Instant::now();
191 self.pop_inner(now);
192 }
193
194 pub fn process(&mut self) {
195 let now = Instant::now();
196 while let Some((_source, msg)) = self.worker_in.recv() {
197 match msg {
198 WorkerControlIn::Ext(ext) => {
199 self.inner.on_event(now, WorkerInnerInput::Ext(ext));
200 self.pop_inner(now);
201 }
202 WorkerControlIn::Spawn(cfg) => {
203 self.inner.spawn(now, cfg);
204 }
205 WorkerControlIn::StatsRequest => {
206 let stats = WorkerStats {
207 tasks: self.inner.tasks(),
208 utilization: 0, is_empty: self.inner.is_empty(),
210 };
211 self.worker_out
212 .send(0, true, WorkerControlOut::Stats(stats))
213 .expect("Should send success with safe flag");
214 }
215 WorkerControlIn::Shutdown => {
216 self.inner.on_shutdown(now);
217 }
218 }
219 }
220
221 if now.duration_since(self.last_tick) >= self.tick {
222 self.last_tick = now;
223 self.inner.on_tick(now);
224 self.pop_inner(now);
225 }
226
227 let remain_time = Duration::from_millis(1)
229 .checked_sub(now.elapsed())
230 .unwrap_or_else(|| Duration::from_micros(1));
231
232 self.backend.poll_incoming(remain_time);
233
234 while let Some(event) = self.backend.pop_incoming() {
235 match event {
236 BackendIncomingInternal::Awake => self.on_awake(now),
237 BackendIncomingInternal::Event(owner, event) => {
238 self.inner
239 .on_event(now, WorkerInnerInput::Net(owner, event));
240 self.pop_inner(now);
241 }
242 }
243 }
244
245 self.backend.finish_incoming_cycle();
246 self.backend.finish_outgoing_cycle();
247 if now.elapsed().as_millis() > 15 {
248 log::warn!("Worker process too long: {}", now.elapsed().as_millis());
249 }
250 }
251
252 fn on_awake(&mut self, now: Instant) {
253 while let Some((source, event)) = self.inner_bus.recv() {
254 match source {
255 BusEventSource::Channel(_, channel) => {
256 if let Some(subscribers) = self.bus_local_hub.get_subscribers(channel) {
257 for subscriber in subscribers {
258 self.inner.on_event(
259 now,
260 WorkerInnerInput::Bus(BusEvent::Channel(
261 subscriber,
262 channel,
263 event.clone(),
264 )),
265 );
266 self.pop_inner(now);
267 }
268 }
269 }
270 BusEventSource::Broadcast(from) => {
271 self.inner.on_event(
272 now,
273 WorkerInnerInput::Bus(BusEvent::Broadcast(from as u16, event.clone())),
274 );
275 self.pop_inner(now);
276 }
277 _ => panic!(
278 "Invalid channel source for task {:?}, only support channel",
279 source
280 ),
281 }
282 }
283 }
284
285 fn pop_inner(&mut self, now: Instant) {
286 while let Some(out) = self.inner.pop_output(now) {
287 self.process_inner_output(out);
288 }
289 }
290
291 fn process_inner_output(
292 &mut self,
293 out: WorkerInnerOutput<Owner, ExtOut, ChannelId, Event, SCfg>,
294 ) {
295 let worker = self.inner.worker_index();
296 match out {
297 WorkerInnerOutput::Spawn(cfg) => {
298 self.worker_out
299 .send(0, true, WorkerControlOut::Spawn(cfg))
300 .expect("Should send success with safe flag");
301 }
302 WorkerInnerOutput::Net(owner, action) => {
303 self.backend.on_action(owner, action);
304 }
305 WorkerInnerOutput::Bus(event) => match event {
306 BusControl::Channel(owner, BusChannelControl::Subscribe(channel)) => {
307 if self.bus_local_hub.subscribe(owner, channel) {
308 log::info!("Worker {worker} subscribe to channel {:?}", channel);
309 self.inner_bus.subscribe(channel);
310 }
311 }
312 BusControl::Channel(owner, BusChannelControl::Unsubscribe(channel)) => {
313 if self.bus_local_hub.unsubscribe(owner, channel) {
314 log::info!("Worker {worker} unsubscribe from channel {:?}", channel);
315 self.inner_bus.unsubscribe(channel);
316 }
317 }
318 BusControl::Channel(_owner, BusChannelControl::Publish(channel, safe, msg)) => {
319 self.inner_bus.publish(channel, safe, msg);
320 }
321 BusControl::Broadcast(safe, msg) => {
322 self.inner_bus.broadcast(safe, msg);
323 }
324 },
325 WorkerInnerOutput::Ext(safe, ext) => {
326 log::debug!("Worker {worker} send external");
328 if let Err(e) = self.worker_out.send(0, safe, WorkerControlOut::Ext(ext)) {
329 log::error!("Failed to send external: {:?}", e);
330 }
331 }
332 WorkerInnerOutput::Continue => {}
333 }
334 }
335}