1use std::{fmt::Debug, hash::Hash, time::Duration};
2
3use crate::{
4 backend::Backend,
5 bus::{BusEventSource, BusSendSingleFeature, BusSystemBuilder, BusWorker},
6 collections::DynamicDeque,
7 worker::{self, WorkerControlIn, WorkerControlOut, WorkerInner, WorkerStats},
8};
9
10const DEFAULT_STACK_SIZE: usize = 12 * 1024 * 1024;
11
12struct WorkerContainer {
13 _join: std::thread::JoinHandle<()>,
14 stats: WorkerStats,
15}
16
17pub struct Controller<
18 ExtIn: Clone,
19 ExtOut: Clone,
20 SCfg,
21 ChannelId,
22 Event,
23 const INNER_BUS_STACK: usize,
24> {
25 worker_inner_bus: BusSystemBuilder<ChannelId, Event, INNER_BUS_STACK>,
26 worker_control_bus: BusSystemBuilder<u16, WorkerControlIn<ExtIn, SCfg>, INNER_BUS_STACK>,
27 worker_event_bus: BusSystemBuilder<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
28 worker_event: BusWorker<u16, WorkerControlOut<ExtOut, SCfg>, INNER_BUS_STACK>,
29 worker_threads: Vec<WorkerContainer>,
30 output: DynamicDeque<ExtOut, 16>,
31 shutdown: bool,
32}
33
34impl<ExtIn: Clone, ExtOut: Clone, SCfg, ChannelId, Event, const INNER_BUS_STACK: usize> Default
35 for Controller<ExtIn, ExtOut, SCfg, ChannelId, Event, INNER_BUS_STACK>
36{
37 fn default() -> Self {
38 let worker_control_bus = BusSystemBuilder::default();
39 let mut worker_event_bus = BusSystemBuilder::default();
40 let worker_event = worker_event_bus.new_worker();
41
42 Self {
43 worker_inner_bus: BusSystemBuilder::default(),
44 worker_control_bus,
45 worker_event_bus,
46 worker_event,
47 worker_threads: Vec::new(),
48 output: DynamicDeque::default(),
49 shutdown: false,
50 }
51 }
52}
53
54impl<
55 ExtIn: 'static + Send + Sync + Clone,
56 ExtOut: 'static + Send + Sync + Clone,
57 SCfg: 'static + Send + Sync,
58 ChannelId: 'static + Debug + Clone + Copy + Hash + PartialEq + Eq + Send + Sync,
59 Event: 'static + Send + Sync + Clone,
60 const INNER_BUS_STACK: usize,
61 > Controller<ExtIn, ExtOut, SCfg, ChannelId, Event, INNER_BUS_STACK>
62{
63 pub fn add_worker<
64 Owner: Debug + Clone + Copy + PartialEq,
65 ICfg: 'static + Send + Sync,
66 Inner: WorkerInner<Owner, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg>,
67 B: Backend<Owner>,
68 >(
69 &mut self,
70 tick: Duration,
71 cfg: ICfg,
72 stack_size: Option<usize>,
73 ) {
74 let worker_index = self.worker_threads.len();
75 let worker_in = self.worker_control_bus.new_worker();
76 let worker_out = self.worker_event_bus.new_worker();
77 let worker_inner = self.worker_inner_bus.new_worker();
78
79 let stack_size = stack_size.unwrap_or(DEFAULT_STACK_SIZE);
80 log::info!("create worker with stack size: {}", stack_size);
81 let (tx, rx) = std::sync::mpsc::channel::<()>();
82 let join = std::thread::Builder::new()
83 .stack_size(stack_size)
84 .spawn(move || {
85 let mut worker =
86 worker::Worker::<Owner, _, _, _, _, _, _, _, B, INNER_BUS_STACK>::new(
87 tick,
88 Inner::build(worker_in.leg_index() as u16, cfg),
89 worker_inner,
90 worker_out,
91 worker_in,
92 );
93 log::info!("Worker {worker_index} started");
94 tx.send(()).expect("Should send start signal");
95 worker.init();
96 loop {
97 worker.process();
98 }
99 })
100 .expect("Should spawn worker thread");
101 rx.recv().expect("Should receive start signal");
102 self.worker_threads.push(WorkerContainer {
103 _join: join,
104 stats: Default::default(),
105 });
106 }
107
108 pub fn process(&mut self) -> Option<()> {
109 if self.shutdown && self.count_running_workers() == 0 {
110 return None;
111 }
112
113 for i in 0..self.worker_threads.len() {
114 if let Err(e) = self
115 .worker_control_bus
116 .send(i, false, WorkerControlIn::StatsRequest)
117 {
118 log::error!("Failed to send stats request to worker {i}: {:?}", e);
119 }
120 }
121 while let Some((source, event)) = self.worker_event.recv() {
122 match (source, event) {
123 (BusEventSource::Direct(source_leg), event) => match event {
124 WorkerControlOut::Stats(stats) => {
125 log::trace!("Worker {source_leg} stats: {:?}", stats);
126 self.worker_threads[source_leg - 1].stats = stats;
129 }
130 WorkerControlOut::Ext(event) => {
131 self.output.push_back(event);
132 }
133 WorkerControlOut::Spawn(cfg) => {
134 self.spawn(cfg);
135 }
136 },
137 _ => {
138 unreachable!("WorkerControlOut only has Stats variant");
139 }
140 }
141 }
142
143 Some(())
144 }
145
146 pub fn spawn(&mut self, cfg: SCfg) {
147 let best_worker = self
149 .worker_threads
150 .iter()
151 .enumerate()
152 .min_by_key(|(_, w)| w.stats.load())
153 .map(|(i, _)| i)
154 .expect("Should have at least one worker");
155 self.worker_threads[best_worker].stats.tasks += 1;
156 if let Err(e) = self
157 .worker_control_bus
158 .send(best_worker, true, WorkerControlIn::Spawn(cfg))
159 {
160 log::error!("Failed to spawn task: {:?}", e);
161 }
162 }
163
164 pub fn shutdown(&mut self) {
165 self.shutdown = true;
166 for i in 0..self.worker_threads.len() {
167 if let Err(e) = self
168 .worker_control_bus
169 .send(i, true, WorkerControlIn::Shutdown)
170 {
171 log::error!("Failed to send shutdown to task: {:?}", e);
172 }
173 }
174 }
175
176 pub fn send_to(&mut self, worker: u16, ext: ExtIn) {
177 if let Err(e) =
178 self.worker_control_bus
179 .send(worker as usize, true, WorkerControlIn::Ext(ext))
180 {
181 log::error!("Failed to send to task: {:?}", e);
182 }
183 }
184
185 pub fn send_to_best(&mut self, ext: ExtIn) {
186 let best_worker = self
188 .worker_threads
189 .iter()
190 .enumerate()
191 .min_by_key(|(_, w)| w.stats.load())
192 .map(|(i, _)| i)
193 .expect("Should have at least one worker");
194 if let Err(e) = self
195 .worker_control_bus
196 .send(best_worker, true, WorkerControlIn::Ext(ext))
197 {
198 log::error!("Failed to send to task: {:?}", e);
199 }
200 }
201
202 pub fn pop_event(&mut self) -> Option<ExtOut> {
203 self.output.pop_front()
204 }
205
206 fn count_running_workers(&self) -> usize {
207 self.worker_threads
208 .iter()
209 .filter(|w| w.stats.tasks() > 0 && !w.stats.is_empty)
210 .count()
211 }
212}