sans_io_runtime/
worker.rs

1use std::{
2    fmt::Debug,
3    hash::Hash,
4    time::{Duration, Instant},
5};
6
7use crate::{
8    backend::{Backend, BackendIncoming, BackendIncomingInternal, BackendOutgoing},
9    bus::{
10        BusEventSource, BusLocalHub, BusPubSubFeature, BusSendMultiFeature, BusSendSingleFeature,
11        BusWorker,
12    },
13};
14
15#[derive(Debug)]
16pub enum BusChannelControl<ChannelId, MSG> {
17    Subscribe(ChannelId),
18    Unsubscribe(ChannelId),
19    /// The first parameter is the channel id, the second parameter is whether the message is safe, and the third parameter is the message.
20    Publish(ChannelId, bool, MSG),
21}
22
23impl<ChannelId, MSG> BusChannelControl<ChannelId, MSG> {
24    pub fn convert_into<NChannelId: From<ChannelId>, NMSG: From<MSG>>(
25        self,
26    ) -> BusChannelControl<NChannelId, NMSG> {
27        match self {
28            Self::Subscribe(channel) => BusChannelControl::Subscribe(channel.into()),
29            Self::Unsubscribe(channel) => BusChannelControl::Unsubscribe(channel.into()),
30            Self::Publish(channel, safe, msg) => {
31                BusChannelControl::Publish(channel.into(), safe, msg.into())
32            }
33        }
34    }
35}
36
37#[derive(Debug)]
38pub enum BusControl<Owner, ChannelId, MSG> {
39    Channel(Owner, BusChannelControl<ChannelId, MSG>),
40    Broadcast(bool, MSG),
41}
42
43impl<Owner, ChannelId, MSG> BusControl<Owner, ChannelId, MSG> {
44    pub fn high_priority(&self) -> bool {
45        matches!(
46            self,
47            Self::Channel(_, BusChannelControl::Subscribe(..))
48                | Self::Channel(_, BusChannelControl::Unsubscribe(..))
49        )
50    }
51
52    pub fn convert_into<NOwner: From<Owner>, NChannelId: From<ChannelId>, NMSG: From<MSG>>(
53        self,
54    ) -> BusControl<Owner, NChannelId, NMSG> {
55        match self {
56            Self::Channel(owner, event) => BusControl::Channel(owner, event.convert_into()),
57            Self::Broadcast(safe, msg) => BusControl::Broadcast(safe, msg.into()),
58        }
59    }
60}
61
62#[derive(Debug)]
63pub enum BusEvent<Owner, ChannelId, MSG> {
64    Broadcast(u16, MSG),
65    Channel(Owner, ChannelId, MSG),
66}
67
68#[derive(Debug, Clone)]
69pub enum WorkerControlIn<Ext: Clone, SCfg> {
70    Ext(Ext),
71    Spawn(SCfg),
72    StatsRequest,
73    Shutdown,
74}
75
76#[derive(Debug)]
77pub enum WorkerControlOut<Ext, SCfg> {
78    Stats(WorkerStats),
79    Ext(Ext),
80    Spawn(SCfg),
81}
82
83#[derive(Debug, Default)]
84pub struct WorkerStats {
85    pub tasks: usize,
86    pub utilization: u32,
87    pub is_empty: bool,
88}
89
90impl WorkerStats {
91    pub fn load(&self) -> u32 {
92        self.tasks as u32
93    }
94
95    pub fn tasks(&self) -> usize {
96        self.tasks
97    }
98}
99
100#[derive(Debug)]
101pub enum WorkerInnerInput<Owner, ExtIn, ChannelId, Event> {
102    Net(Owner, BackendIncoming),
103    Bus(BusEvent<Owner, ChannelId, Event>),
104    Ext(ExtIn),
105}
106
107pub enum WorkerInnerOutput<Owner, ExtOut, ChannelId, Event, SCfg> {
108    Net(Owner, BackendOutgoing),
109    Bus(BusControl<Owner, ChannelId, Event>),
110    /// First bool is message need to safe to send or not, second is the message
111    Ext(bool, ExtOut),
112    Spawn(SCfg),
113    Continue,
114}
115
116pub trait WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg> {
117    fn build(worker: u16, cfg: ICfg) -> Self;
118    fn worker_index(&self) -> u16;
119    fn tasks(&self) -> usize;
120    fn is_empty(&self) -> bool;
121    fn spawn(&mut self, now: Instant, cfg: SCfg);
122    fn on_tick(&mut self, now: Instant);
123    fn on_event(&mut self, now: Instant, event: WorkerInnerInput<Owner, ExtIn, ChannelId, Event>);
124    fn on_shutdown(&mut self, now: Instant);
125    fn pop_output(
126        &mut self,
127        now: Instant,
128    ) -> Option<WorkerInnerOutput<Owner, ExtOut, ChannelId, Event, SCfg>>;
129}
130
131pub(crate) struct Worker<
132    Owner,
133    ExtIn: Clone,
134    ExtOut: Clone,
135    ChannelId: Hash + PartialEq + Eq,
136    Event,
137    Inner: WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg>,
138    ICfg,
139    SCfg,
140    B: Backend<Owner>,
141    const INNER_BUS_STACK: usize,
142> {
143    tick: Duration,
144    last_tick: Instant,
145    inner: Inner,
146    bus_local_hub: BusLocalHub<Owner, ChannelId>,
147    inner_bus: BusWorker<ChannelId, Event, INNER_BUS_STACK>,
148    backend: B,
149    worker_out: BusWorker<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
150    worker_in: BusWorker<u16, WorkerControlIn<ExtIn, SCfg>, INNER_BUS_STACK>,
151    _tmp: std::marker::PhantomData<(Owner, ICfg)>,
152}
153
154impl<
155        Owner: Debug + Clone + Copy + PartialEq,
156        ExtIn: Clone,
157        ExtOut: Clone,
158        ChannelId: Hash + PartialEq + Eq + Debug + Copy,
159        Event: Clone,
160        Inner: WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg>,
161        B: Backend<Owner>,
162        ICfg,
163        SCfg,
164        const INNER_BUS_STACK: usize,
165    > Worker<Owner, ExtIn, ExtOut, ChannelId, Event, Inner, ICfg, SCfg, B, INNER_BUS_STACK>
166{
167    pub fn new(
168        tick: Duration,
169        inner: Inner,
170        inner_bus: BusWorker<ChannelId, Event, INNER_BUS_STACK>,
171        worker_out: BusWorker<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
172        worker_in: BusWorker<u16, WorkerControlIn<ExtIn, SCfg>, INNER_BUS_STACK>,
173    ) -> Self {
174        let backend = B::default();
175        inner_bus.set_awaker(backend.create_awaker());
176        Self {
177            inner,
178            tick,
179            last_tick: Instant::now() - 2 * tick, //for ensure first tick as fast as possible
180            bus_local_hub: Default::default(),
181            inner_bus,
182            backend,
183            worker_out,
184            worker_in,
185            _tmp: Default::default(),
186        }
187    }
188
189    pub fn init(&mut self) {
190        let now = Instant::now();
191        self.pop_inner(now);
192    }
193
194    pub fn process(&mut self) {
195        let now = Instant::now();
196        while let Some((_source, msg)) = self.worker_in.recv() {
197            match msg {
198                WorkerControlIn::Ext(ext) => {
199                    self.inner.on_event(now, WorkerInnerInput::Ext(ext));
200                    self.pop_inner(now);
201                }
202                WorkerControlIn::Spawn(cfg) => {
203                    self.inner.spawn(now, cfg);
204                }
205                WorkerControlIn::StatsRequest => {
206                    let stats = WorkerStats {
207                        tasks: self.inner.tasks(),
208                        utilization: 0, //TODO measure this thread utilization
209                        is_empty: self.inner.is_empty(),
210                    };
211                    self.worker_out
212                        .send(0, true, WorkerControlOut::Stats(stats))
213                        .expect("Should send success with safe flag");
214                }
215                WorkerControlIn::Shutdown => {
216                    self.inner.on_shutdown(now);
217                }
218            }
219        }
220
221        if now.duration_since(self.last_tick) >= self.tick {
222            self.last_tick = now;
223            self.inner.on_tick(now);
224            self.pop_inner(now);
225        }
226
227        //one cycle is process in 1ms then we minus 1ms with eslaped time
228        let remain_time = Duration::from_millis(1)
229            .checked_sub(now.elapsed())
230            .unwrap_or_else(|| Duration::from_micros(1));
231
232        self.backend.poll_incoming(remain_time);
233
234        while let Some(event) = self.backend.pop_incoming() {
235            match event {
236                BackendIncomingInternal::Awake => self.on_awake(now),
237                BackendIncomingInternal::Event(owner, event) => {
238                    self.inner
239                        .on_event(now, WorkerInnerInput::Net(owner, event));
240                    self.pop_inner(now);
241                }
242            }
243        }
244
245        self.backend.finish_incoming_cycle();
246        self.backend.finish_outgoing_cycle();
247        if now.elapsed().as_millis() > 15 {
248            log::warn!("Worker process too long: {}", now.elapsed().as_millis());
249        }
250    }
251
252    fn on_awake(&mut self, now: Instant) {
253        while let Some((source, event)) = self.inner_bus.recv() {
254            match source {
255                BusEventSource::Channel(_, channel) => {
256                    if let Some(subscribers) = self.bus_local_hub.get_subscribers(channel) {
257                        for subscriber in subscribers {
258                            self.inner.on_event(
259                                now,
260                                WorkerInnerInput::Bus(BusEvent::Channel(
261                                    subscriber,
262                                    channel,
263                                    event.clone(),
264                                )),
265                            );
266                            self.pop_inner(now);
267                        }
268                    }
269                }
270                BusEventSource::Broadcast(from) => {
271                    self.inner.on_event(
272                        now,
273                        WorkerInnerInput::Bus(BusEvent::Broadcast(from as u16, event.clone())),
274                    );
275                    self.pop_inner(now);
276                }
277                _ => panic!(
278                    "Invalid channel source for task {:?}, only support channel",
279                    source
280                ),
281            }
282        }
283    }
284
285    fn pop_inner(&mut self, now: Instant) {
286        while let Some(out) = self.inner.pop_output(now) {
287            self.process_inner_output(out);
288        }
289    }
290
291    fn process_inner_output(
292        &mut self,
293        out: WorkerInnerOutput<Owner, ExtOut, ChannelId, Event, SCfg>,
294    ) {
295        let worker = self.inner.worker_index();
296        match out {
297            WorkerInnerOutput::Spawn(cfg) => {
298                self.worker_out
299                    .send(0, true, WorkerControlOut::Spawn(cfg))
300                    .expect("Should send success with safe flag");
301            }
302            WorkerInnerOutput::Net(owner, action) => {
303                self.backend.on_action(owner, action);
304            }
305            WorkerInnerOutput::Bus(event) => match event {
306                BusControl::Channel(owner, BusChannelControl::Subscribe(channel)) => {
307                    if self.bus_local_hub.subscribe(owner, channel) {
308                        log::info!("Worker {worker} subscribe to channel {:?}", channel);
309                        self.inner_bus.subscribe(channel);
310                    }
311                }
312                BusControl::Channel(owner, BusChannelControl::Unsubscribe(channel)) => {
313                    if self.bus_local_hub.unsubscribe(owner, channel) {
314                        log::info!("Worker {worker} unsubscribe from channel {:?}", channel);
315                        self.inner_bus.unsubscribe(channel);
316                    }
317                }
318                BusControl::Channel(_owner, BusChannelControl::Publish(channel, safe, msg)) => {
319                    self.inner_bus.publish(channel, safe, msg);
320                }
321                BusControl::Broadcast(safe, msg) => {
322                    self.inner_bus.broadcast(safe, msg);
323                }
324            },
325            WorkerInnerOutput::Ext(safe, ext) => {
326                // TODO don't hardcode 0
327                log::debug!("Worker {worker} send external");
328                if let Err(e) = self.worker_out.send(0, safe, WorkerControlOut::Ext(ext)) {
329                    log::error!("Failed to send external: {:?}", e);
330                }
331            }
332            WorkerInnerOutput::Continue => {}
333        }
334    }
335}