1pub mod backend;
2mod buffer;
3pub mod bus;
4pub mod collections;
5mod controller;
6mod task;
7mod trace;
8mod worker;
9
10pub use buffer::*;
11pub use controller::Controller;
12pub use task::{
13 group::{TaskGroup, TaskGroupOutput},
14 switcher::{TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild},
15 Task,
16};
17pub use trace::*;
18pub use worker::{
19 BusChannelControl, BusControl, BusEvent, WorkerInner, WorkerInnerInput, WorkerInnerOutput,
20 WorkerStats,
21};
22
23#[macro_export]
24macro_rules! return_if_none {
25 ($option:expr) => {
26 match $option {
27 Some(val) => val,
28 None => return,
29 }
30 };
31}
32
33#[macro_export]
34macro_rules! return_if_some {
35 ($option:expr) => {
36 let out = $option;
37 if out.is_some() {
38 return out;
39 }
40 };
41}
42
43#[macro_export]
44macro_rules! return_if_err {
45 ($option:expr) => {
46 match $option {
47 Ok(val) => val,
48 Err(_) => return,
49 }
50 };
51}
52
53#[macro_export]
54macro_rules! group_owner_type {
55 ($name:ident) => {
56 #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
57 pub struct $name(usize);
58
59 #[allow(dead_code)]
60 impl $name {
61 fn build(index: usize) -> Self {
62 Self(index)
63 }
64 pub fn index(&self) -> usize {
65 self.0
66 }
67 }
68
69 impl From<usize> for $name {
70 fn from(value: usize) -> Self {
71 Self(value)
72 }
73 }
74 };
75}
76
77#[macro_export]
78macro_rules! group_task {
79 ($name:ident, $task:ty, $input:ty, $output:ty) => {
80 pub struct $name {
81 tasks: Vec<Option<$task>>,
82 switcher: TaskSwitcher,
83 }
84
85 impl Default for $name {
86 fn default() -> Self {
87 Self {
88 tasks: Vec::new(),
89 switcher: TaskSwitcher::new(0),
90 }
91 }
92 }
93
94 impl $name {
95 pub fn tasks(&self) -> usize {
97 let tasks = self.tasks.iter().filter(|x| x.is_some()).count();
99 tasks
100 }
101
102 pub fn has_task(&self, index: usize) -> bool {
104 matches!(self.tasks.get(index), Some(Some(_)))
105 }
106
107 pub fn add_task(&mut self, task: $task) -> usize {
109 for (index, slot) in self.tasks.iter_mut().enumerate() {
110 if slot.is_none() {
111 *slot = Some(task);
112 return index;
113 }
114 }
115
116 self.tasks.push(Some(task));
117 self.switcher.set_tasks(self.tasks.len());
118 self.tasks.len() - 1
119 }
120
121 pub fn remove_task(&mut self, index: usize) {
123 self.tasks
124 .get_mut(index)
125 .expect("Should have task when remove")
126 .take();
127 while let Some(None) = self.tasks.last() {
128 self.tasks.pop();
129 }
130 self.switcher.set_tasks(self.tasks.len());
131 }
132
133 pub fn on_tick(&mut self, now: std::time::Instant) {
134 self.switcher.flag_all();
135 for index in 0..self.switcher.tasks() {
136 if let Some(Some(task)) = self.tasks.get_mut(index) {
137 task.on_tick(now);
138 }
139 }
140 }
141
142 pub fn on_event<'a>(
143 &mut self,
144 now: std::time::Instant,
145 index: usize,
146 input: $input,
147 ) -> Option<()> {
148 let task = self.tasks.get_mut(index)?.as_mut()?;
149 self.switcher.flag_task(index);
150 task.on_event(now, input);
151 Some(())
152 }
153
154 pub fn pop_output<'a>(&mut self, now: std::time::Instant) -> Option<(usize, $output)> {
155 while let Some(index) = self.switcher.current() {
156 let slot = self.tasks.get_mut(index);
157 if let Some(Some(slot)) = slot {
158 if let Some(out) = slot.pop_output(now) {
159 return Some((index, out));
160 } else {
161 self.switcher.finished(index);
162 }
163 } else {
164 self.switcher.finished(index);
165 }
166 }
167 None
168 }
169
170 pub fn on_shutdown(&mut self, now: std::time::Instant) {
172 self.switcher.flag_all();
173 for index in 0..self.switcher.tasks() {
174 log::info!("Group kill tasks {}/{}", index, self.switcher.tasks());
175 if let Some(Some(task)) = self.tasks.get_mut(index) {
176 task.on_shutdown(now);
177 }
178 }
179 }
180 }
181 };
182}