sans_io_runtime/
controller.rs

1use std::{fmt::Debug, hash::Hash, time::Duration};
2
3use crate::{
4    backend::Backend,
5    bus::{BusEventSource, BusSendSingleFeature, BusSystemBuilder, BusWorker},
6    collections::DynamicDeque,
7    worker::{self, WorkerControlIn, WorkerControlOut, WorkerInner, WorkerStats},
8};
9
10const DEFAULT_STACK_SIZE: usize = 12 * 1024 * 1024;
11
12struct WorkerContainer {
13    _join: std::thread::JoinHandle<()>,
14    stats: WorkerStats,
15}
16
17pub struct Controller<
18    ExtIn: Clone,
19    ExtOut: Clone,
20    SCfg,
21    ChannelId,
22    Event,
23    const INNER_BUS_STACK: usize,
24> {
25    worker_inner_bus: BusSystemBuilder<ChannelId, Event, INNER_BUS_STACK>,
26    worker_control_bus: BusSystemBuilder<u16, WorkerControlIn<ExtIn, SCfg>, INNER_BUS_STACK>,
27    worker_event_bus: BusSystemBuilder<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
28    worker_event: BusWorker<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
29    worker_threads: Vec<WorkerContainer>,
30    output: DynamicDeque<ExtOut, 16>,
31    shutdown: bool,
32}
33
34impl<ExtIn: Clone, ExtOut: Clone, SCfg, ChannelId, Event, const INNER_BUS_STACK: usize> Default
35    for Controller<ExtIn, ExtOut, SCfg, ChannelId, Event, INNER_BUS_STACK>
36{
37    fn default() -> Self {
38        let worker_control_bus = BusSystemBuilder::default();
39        let mut worker_event_bus = BusSystemBuilder::default();
40        let worker_event = worker_event_bus.new_worker();
41
42        Self {
43            worker_inner_bus: BusSystemBuilder::default(),
44            worker_control_bus,
45            worker_event_bus,
46            worker_event,
47            worker_threads: Vec::new(),
48            output: DynamicDeque::default(),
49            shutdown: false,
50        }
51    }
52}
53
54impl<
55        ExtIn: 'static + Send + Sync + Clone,
56        ExtOut: 'static + Send + Sync + Clone,
57        SCfg: 'static + Send + Sync,
58        ChannelId: 'static + Debug + Clone + Copy + Hash + PartialEq + Eq + Send + Sync,
59        Event: 'static + Send + Sync + Clone,
60        const INNER_BUS_STACK: usize,
61    > Controller<ExtIn, ExtOut, SCfg, ChannelId, Event, INNER_BUS_STACK>
62{
63    pub fn add_worker<
64        Owner: Debug + Clone + Copy + PartialEq,
65        ICfg: 'static + Send + Sync,
66        Inner: WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg>,
67        B: Backend<Owner>,
68    >(
69        &mut self,
70        tick: Duration,
71        cfg: ICfg,
72        stack_size: Option<usize>,
73    ) {
74        let worker_index = self.worker_threads.len();
75        let worker_in = self.worker_control_bus.new_worker();
76        let worker_out = self.worker_event_bus.new_worker();
77        let worker_inner = self.worker_inner_bus.new_worker();
78
79        let stack_size = stack_size.unwrap_or(DEFAULT_STACK_SIZE);
80        log::info!("create worker with stack size: {}", stack_size);
81        let (tx, rx) = std::sync::mpsc::channel::<()>();
82        let join = std::thread::Builder::new()
83            .stack_size(stack_size)
84            .spawn(move || {
85                let mut worker =
86                    worker::Worker::<Owner, _, _, _, _, _, _, _, B, INNER_BUS_STACK>::new(
87                        tick,
88                        Inner::build(worker_in.leg_index() as u16, cfg),
89                        worker_inner,
90                        worker_out,
91                        worker_in,
92                    );
93                log::info!("Worker {worker_index} started");
94                tx.send(()).expect("Should send start signal");
95                worker.init();
96                loop {
97                    worker.process();
98                }
99            })
100            .expect("Should spawn worker thread");
101        rx.recv().expect("Should receive start signal");
102        self.worker_threads.push(WorkerContainer {
103            _join: join,
104            stats: Default::default(),
105        });
106    }
107
108    pub fn process(&mut self) -> Option<()> {
109        if self.shutdown && self.count_running_workers() == 0 {
110            return None;
111        }
112
113        for i in 0..self.worker_threads.len() {
114            if let Err(e) = self
115                .worker_control_bus
116                .send(i, false, WorkerControlIn::StatsRequest)
117            {
118                log::error!("Failed to send stats request to worker {i}: {:?}", e);
119            }
120        }
121        while let Some((source, event)) = self.worker_event.recv() {
122            match (source, event) {
123                (BusEventSource::Direct(source_leg), event) => match event {
124                    WorkerControlOut::Stats(stats) => {
125                        log::trace!("Worker {source_leg} stats: {:?}", stats);
126                        // source_leg is 1-based because of 0 is for controller
127                        // TODO avoid -1, should not hack this way
128                        self.worker_threads[source_leg - 1].stats = stats;
129                    }
130                    WorkerControlOut::Ext(event) => {
131                        self.output.push_back(event);
132                    }
133                    WorkerControlOut::Spawn(cfg) => {
134                        self.spawn(cfg);
135                    }
136                },
137                _ => {
138                    unreachable!("WorkerControlOut only has Stats variant");
139                }
140            }
141        }
142
143        Some(())
144    }
145
146    pub fn spawn(&mut self, cfg: SCfg) {
147        // get worker index with lowest load
148        let best_worker = self
149            .worker_threads
150            .iter()
151            .enumerate()
152            .min_by_key(|(_, w)| w.stats.load())
153            .map(|(i, _)| i)
154            .expect("Should have at least one worker");
155        self.worker_threads[best_worker].stats.tasks += 1;
156        if let Err(e) = self
157            .worker_control_bus
158            .send(best_worker, true, WorkerControlIn::Spawn(cfg))
159        {
160            log::error!("Failed to spawn task: {:?}", e);
161        }
162    }
163
164    pub fn shutdown(&mut self) {
165        self.shutdown = true;
166        for i in 0..self.worker_threads.len() {
167            if let Err(e) = self
168                .worker_control_bus
169                .send(i, true, WorkerControlIn::Shutdown)
170            {
171                log::error!("Failed to send shutdown to task: {:?}", e);
172            }
173        }
174    }
175
176    pub fn send_to(&mut self, worker: u16, ext: ExtIn) {
177        if let Err(e) =
178            self.worker_control_bus
179                .send(worker as usize, true, WorkerControlIn::Ext(ext))
180        {
181            log::error!("Failed to send to task: {:?}", e);
182        }
183    }
184
185    pub fn send_to_best(&mut self, ext: ExtIn) {
186        // get worker index with lowest load
187        let best_worker = self
188            .worker_threads
189            .iter()
190            .enumerate()
191            .min_by_key(|(_, w)| w.stats.load())
192            .map(|(i, _)| i)
193            .expect("Should have at least one worker");
194        if let Err(e) = self
195            .worker_control_bus
196            .send(best_worker, true, WorkerControlIn::Ext(ext))
197        {
198            log::error!("Failed to send to task: {:?}", e);
199        }
200    }
201
202    pub fn pop_event(&mut self) -> Option<ExtOut> {
203        self.output.pop_front()
204    }
205
206    fn count_running_workers(&self) -> usize {
207        self.worker_threads
208            .iter()
209            .filter(|w| w.stats.tasks() > 0 && !w.stats.is_empty)
210            .count()
211    }
212}